Migrate libevent task queue implementation to TaskQueueBase interface

Bug: webrtc:10191
Change-Id: I480da22f6db781e877dcb92d46ce7f445892df6a
Reviewed-on: https://webrtc-review.googlesource.com/c/118929
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26644}
diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn
index 8884bd3..ebb7e39 100644
--- a/api/task_queue/BUILD.gn
+++ b/api/task_queue/BUILD.gn
@@ -72,7 +72,31 @@
   # TODO(bugs.webrtc.org/10284): Include implementation unconditionally when
   # global task queue factory is removed.
   if (rtc_link_task_queue_impl) {
-    sources += [ "default_task_queue_factory.cc" ]
+    deps += [ ":default_task_queue_factory_impl" ]
+  }
+}
+
+# TODO(bugs.webrtc.org/10191): Merge back to default_task_queue_factory when
+# rtc_task_queue_impl build target is removed.
+rtc_source_set("default_task_queue_factory_impl") {
+  # Include the implementation when rtc_link_task_queue_impl is set to default
+  # value of true or when explicit dependency on "rtc_task_queue_impl" is added.
+  visibility = [
+    ":default_task_queue_factory",
+    "../../rtc_base:rtc_task_queue_impl",
+  ]
+  deps = [
+    ":task_queue_factory",
+  ]
+  if (rtc_enable_libevent) {
+    sources = [
+      "default_task_queue_factory_libevent.cc",
+    ]
+    deps += [ "../../rtc_base:rtc_task_queue_libevent" ]
+  } else {
+    sources = [
+      "default_task_queue_factory_unimplemented.cc",
+    ]
     deps += [ "../../rtc_base:checks" ]
   }
 }
diff --git a/api/task_queue/default_task_queue_factory.cc b/api/task_queue/default_task_queue_factory_libevent.cc
similarity index 67%
copy from api/task_queue/default_task_queue_factory.cc
copy to api/task_queue/default_task_queue_factory_libevent.cc
index b3e86bb..f2fb418 100644
--- a/api/task_queue/default_task_queue_factory.cc
+++ b/api/task_queue/default_task_queue_factory_libevent.cc
@@ -7,16 +7,15 @@
  *  in the file PATENTS.  All contributing project authors may
  *  be found in the AUTHORS file in the root of the source tree.
  */
-#include "api/task_queue/default_task_queue_factory.h"
+#include <memory>
 
-#include "rtc_base/checks.h"
+#include "api/task_queue/task_queue_factory.h"
+#include "rtc_base/task_queue_libevent.h"
 
 namespace webrtc {
 
 std::unique_ptr<TaskQueueFactory> CreateDefaultTaskQueueFactory() {
-  RTC_CHECK(false)
-      << "Default task queue is not implemented for current platform, "
-         "overwrite the task queue implementation by setting global factory.";
+  return CreateTaskQueueLibeventFactory();
 }
 
 }  // namespace webrtc
diff --git a/api/task_queue/default_task_queue_factory.cc b/api/task_queue/default_task_queue_factory_unimplemented.cc
similarity index 91%
rename from api/task_queue/default_task_queue_factory.cc
rename to api/task_queue/default_task_queue_factory_unimplemented.cc
index b3e86bb..d4020f8 100644
--- a/api/task_queue/default_task_queue_factory.cc
+++ b/api/task_queue/default_task_queue_factory_unimplemented.cc
@@ -7,8 +7,9 @@
  *  in the file PATENTS.  All contributing project authors may
  *  be found in the AUTHORS file in the root of the source tree.
  */
-#include "api/task_queue/default_task_queue_factory.h"
+#include <memory>
 
+#include "api/task_queue/task_queue_factory.h"
 #include "rtc_base/checks.h"
 
 namespace webrtc {
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 4462381..c152533 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -507,11 +507,10 @@
 
 if (rtc_enable_libevent) {
   rtc_source_set("rtc_task_queue_libevent") {
-    visibility = [ ":rtc_task_queue_impl" ]
+    visibility = [ "../api/task_queue:default_task_queue_factory_impl" ]
     sources = [
       "task_queue_libevent.cc",
-      "task_queue_posix.cc",
-      "task_queue_posix.h",
+      "task_queue_libevent.h",
     ]
     deps = [
       ":checks",
@@ -520,12 +519,12 @@
       ":macromagic",
       ":platform_thread",
       ":platform_thread_types",
-      ":refcount",
-      ":rtc_task_queue_api",
       ":safe_conversions",
       ":timeutils",
-      "../api:scoped_refptr",
-      "system:unused",
+      "../api/task_queue",
+      "../api/task_queue:task_queue_factory",
+      "//third_party/abseil-cpp/absl/memory",
+      "//third_party/abseil-cpp/absl/strings",
     ]
     if (rtc_build_libevent) {
       deps += [ "//base/third_party/libevent" ]
@@ -597,7 +596,8 @@
   visibility = [ "*" ]
   if (rtc_enable_libevent) {
     deps = [
-      ":rtc_task_queue_libevent",
+      "../api/task_queue:default_task_queue_factory_impl",
+      "../api/task_queue:global_task_queue_factory",
     ]
   } else {
     if (is_mac || is_ios) {
diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc
index 5ff177c..38704c8 100644
--- a/rtc_base/task_queue_libevent.cc
+++ b/rtc_base/task_queue_libevent.cc
@@ -8,7 +8,7 @@
  *  be found in the AUTHORS file in the root of the source tree.
  */
 
-#include "rtc_base/task_queue.h"
+#include "rtc_base/task_queue_libevent.h"
 
 #include <errno.h>
 #include <fcntl.h>
@@ -22,7 +22,10 @@
 #include <type_traits>
 #include <utility>
 
-#include "api/scoped_refptr.h"
+#include "absl/memory/memory.h"
+#include "absl/strings/string_view.h"
+#include "api/task_queue/queued_task.h"
+#include "api/task_queue/task_queue_base.h"
 #include "base/third_party/libevent/event.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/critical_section.h"
@@ -30,22 +33,15 @@
 #include "rtc_base/numerics/safe_conversions.h"
 #include "rtc_base/platform_thread.h"
 #include "rtc_base/platform_thread_types.h"
-#include "rtc_base/ref_count.h"
-#include "rtc_base/ref_counted_object.h"
-#include "rtc_base/system/unused.h"
-#include "rtc_base/task_queue_posix.h"
 #include "rtc_base/thread_annotations.h"
 #include "rtc_base/time_utils.h"
 
-namespace rtc {
-using internal::GetQueuePtrTls;
-using internal::AutoSetCurrentQueuePtr;
-
+namespace webrtc {
 namespace {
-static const char kQuit = 1;
-static const char kRunTask = 2;
+constexpr char kQuit = 1;
+constexpr char kRunTask = 2;
 
-using Priority = TaskQueue::Priority;
+using Priority = TaskQueueFactory::Priority;
 
 // This ignores the SIGPIPE signal on the calling thread.
 // This signal can be fired when trying to write() to a pipe that's being
@@ -67,14 +63,6 @@
   pthread_sigmask(SIG_BLOCK, &sigpipe_mask, nullptr);
 }
 
-struct TimerEvent {
-  explicit TimerEvent(std::unique_ptr<QueuedTask> task)
-      : task(std::move(task)) {}
-  ~TimerEvent() { event_del(&ev); }
-  event ev;
-  std::unique_ptr<QueuedTask> task;
-};
-
 bool SetNonBlocking(int fd) {
   const int flags = fcntl(fd, F_GETFL);
   RTC_CHECK(flags != -1);
@@ -101,78 +89,76 @@
 #endif
 }
 
-ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
+rtc::ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
   switch (priority) {
     case Priority::HIGH:
-      return kRealtimePriority;
+      return rtc::kRealtimePriority;
     case Priority::LOW:
-      return kLowPriority;
+      return rtc::kLowPriority;
     case Priority::NORMAL:
-      return kNormalPriority;
+      return rtc::kNormalPriority;
     default:
       RTC_NOTREACHED();
       break;
   }
-  return kNormalPriority;
+  return rtc::kNormalPriority;
 }
-}  // namespace
 
-class TaskQueue::Impl : public RefCountInterface {
+class TaskQueueLibevent final : public TaskQueueBase {
  public:
-  explicit Impl(const char* queue_name,
-                TaskQueue* queue,
-                Priority priority = Priority::NORMAL);
-  ~Impl() override;
+  TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
 
-  static TaskQueue::Impl* Current();
-  static TaskQueue* CurrentQueue();
-
-  // Used for DCHECKing the current queue.
-  bool IsCurrent() const;
-
-  void PostTask(std::unique_ptr<QueuedTask> task);
-  void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
+  void Delete() override;
+  void PostTask(std::unique_ptr<QueuedTask> task) override;
+  void PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                       uint32_t milliseconds) override;
 
  private:
+  class SetTimerTask;
+  struct TimerEvent;
+
+  ~TaskQueueLibevent() override = default;
+
   static void ThreadMain(void* context);
   static void OnWakeup(int socket, short flags, void* context);  // NOLINT
   static void RunTask(int fd, short flags, void* context);       // NOLINT
   static void RunTimer(int fd, short flags, void* context);      // NOLINT
 
-  class SetTimerTask;
-
-  struct QueueContext;
-  TaskQueue* const queue_;
+  bool is_active_ = true;
   int wakeup_pipe_in_ = -1;
   int wakeup_pipe_out_ = -1;
   event_base* event_base_;
-  std::unique_ptr<event> wakeup_event_;
-  PlatformThread thread_;
+  event wakeup_event_;
+  rtc::PlatformThread thread_;
   rtc::CriticalSection pending_lock_;
   std::list<std::unique_ptr<QueuedTask>> pending_ RTC_GUARDED_BY(pending_lock_);
-};
-
-struct TaskQueue::Impl::QueueContext {
-  explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {}
-  TaskQueue::Impl* queue;
-  bool is_active;
   // Holds a list of events pending timers for cleanup when the loop exits.
   std::list<TimerEvent*> pending_timers_;
 };
 
-class TaskQueue::Impl::SetTimerTask : public QueuedTask {
+struct TaskQueueLibevent::TimerEvent {
+  TimerEvent(TaskQueueLibevent* task_queue, std::unique_ptr<QueuedTask> task)
+      : task_queue(task_queue), task(std::move(task)) {}
+  ~TimerEvent() { event_del(&ev); }
+
+  event ev;
+  TaskQueueLibevent* task_queue;
+  std::unique_ptr<QueuedTask> task;
+};
+
+class TaskQueueLibevent::SetTimerTask : public QueuedTask {
  public:
   SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
       : task_(std::move(task)),
         milliseconds_(milliseconds),
-        posted_(Time32()) {}
+        posted_(rtc::Time32()) {}
 
  private:
   bool Run() override {
     // Compensate for the time that has passed since construction
     // and until we got here.
-    uint32_t post_time = Time32() - posted_;
-    TaskQueue::Impl::Current()->PostDelayedTask(
+    uint32_t post_time = rtc::Time32() - posted_;
+    TaskQueueLibevent::Current()->PostDelayedTask(
         std::move(task_),
         post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
     return true;
@@ -183,17 +169,10 @@
   const uint32_t posted_;
 };
 
-TaskQueue::Impl::Impl(const char* queue_name,
-                      TaskQueue* queue,
-                      Priority priority /*= NORMAL*/)
-    : queue_(queue),
-      event_base_(event_base_new()),
-      wakeup_event_(new event()),
-      thread_(&TaskQueue::Impl::ThreadMain,
-              this,
-              queue_name,
-              TaskQueuePriorityToThreadPriority(priority)) {
-  RTC_DCHECK(queue_name);
+TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
+                                     rtc::ThreadPriority priority)
+    : event_base_(event_base_new()),
+      thread_(&TaskQueueLibevent::ThreadMain, this, queue_name, priority) {
   int fds[2];
   RTC_CHECK(pipe(fds) == 0);
   SetNonBlocking(fds[0]);
@@ -201,13 +180,13 @@
   wakeup_pipe_out_ = fds[0];
   wakeup_pipe_in_ = fds[1];
 
-  EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
+  EventAssign(&wakeup_event_, event_base_, wakeup_pipe_out_,
               EV_READ | EV_PERSIST, OnWakeup, this);
-  event_add(wakeup_event_.get(), 0);
+  event_add(&wakeup_event_, 0);
   thread_.Start();
 }
 
-TaskQueue::Impl::~Impl() {
+void TaskQueueLibevent::Delete() {
   RTC_DCHECK(!IsCurrent());
   struct timespec ts;
   char message = kQuit;
@@ -221,7 +200,7 @@
 
   thread_.Stop();
 
-  event_del(wakeup_event_.get());
+  event_del(&wakeup_event_);
 
   IgnoreSigPipeSignalOnCurrentThread();
 
@@ -231,48 +210,30 @@
   wakeup_pipe_out_ = -1;
 
   event_base_free(event_base_);
+  delete this;
 }
 
-// static
-TaskQueue::Impl* TaskQueue::Impl::Current() {
-  QueueContext* ctx =
-      static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
-  return ctx ? ctx->queue : nullptr;
-}
-
-// static
-TaskQueue* TaskQueue::Impl::CurrentQueue() {
-  TaskQueue::Impl* current = Current();
-  if (current) {
-    return current->queue_;
-  }
-  return nullptr;
-}
-
-bool TaskQueue::Impl::IsCurrent() const {
-  return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
-}
-
-void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
   RTC_DCHECK(task.get());
   // libevent isn't thread safe.  This means that we can't use methods such
   // as event_base_once to post tasks to the worker thread from a different
   // thread.  However, we can use it when posting from the worker thread itself.
   if (IsCurrent()) {
-    if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask,
-                        task.get(), nullptr) == 0) {
+    if (event_base_once(event_base_, -1, EV_TIMEOUT,
+                        &TaskQueueLibevent::RunTask, task.get(),
+                        nullptr) == 0) {
       task.release();
     }
   } else {
     QueuedTask* task_id = task.get();  // Only used for comparison.
     {
-      CritScope lock(&pending_lock_);
+      rtc::CritScope lock(&pending_lock_);
       pending_.push_back(std::move(task));
     }
     char message = kRunTask;
     if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
       RTC_LOG(WARNING) << "Failed to queue task.";
-      CritScope lock(&pending_lock_);
+      rtc::CritScope lock(&pending_lock_);
       pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
         return t.get() == task_id;
       });
@@ -280,61 +241,55 @@
   }
 }
 
-void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
-                                      uint32_t milliseconds) {
+void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                                        uint32_t milliseconds) {
   if (IsCurrent()) {
-    TimerEvent* timer = new TimerEvent(std::move(task));
-    EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer,
+    TimerEvent* timer = new TimerEvent(this, std::move(task));
+    EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
                 timer);
-    QueueContext* ctx =
-        static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
-    ctx->pending_timers_.push_back(timer);
+    pending_timers_.push_back(timer);
     timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
                   rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
     event_add(&timer->ev, &tv);
   } else {
-    PostTask(std::unique_ptr<QueuedTask>(
-        new SetTimerTask(std::move(task), milliseconds)));
+    PostTask(absl::make_unique<SetTimerTask>(std::move(task), milliseconds));
   }
 }
 
 // static
-void TaskQueue::Impl::ThreadMain(void* context) {
-  TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
+void TaskQueueLibevent::ThreadMain(void* context) {
+  TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
 
-  QueueContext queue_context(me);
-  pthread_setspecific(GetQueuePtrTls(), &queue_context);
+  {
+    CurrentTaskQueueSetter set_current(me);
+    while (me->is_active_)
+      event_base_loop(me->event_base_, 0);
+  }
 
-  while (queue_context.is_active)
-    event_base_loop(me->event_base_, 0);
-
-  pthread_setspecific(GetQueuePtrTls(), nullptr);
-
-  for (TimerEvent* timer : queue_context.pending_timers_)
+  for (TimerEvent* timer : me->pending_timers_)
     delete timer;
 }
 
 // static
-void TaskQueue::Impl::OnWakeup(int socket,
-                               short flags,
-                               void* context) {  // NOLINT
-  QueueContext* ctx =
-      static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
-  RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
+void TaskQueueLibevent::OnWakeup(int socket,
+                                 short flags,  // NOLINT
+                                 void* context) {
+  TaskQueueLibevent* me = static_cast<TaskQueueLibevent*>(context);
+  RTC_DCHECK(me->wakeup_pipe_out_ == socket);
   char buf;
   RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
   switch (buf) {
     case kQuit:
-      ctx->is_active = false;
-      event_base_loopbreak(ctx->queue->event_base_);
+      me->is_active_ = false;
+      event_base_loopbreak(me->event_base_);
       break;
     case kRunTask: {
       std::unique_ptr<QueuedTask> task;
       {
-        CritScope lock(&ctx->queue->pending_lock_);
-        RTC_DCHECK(!ctx->queue->pending_.empty());
-        task = std::move(ctx->queue->pending_.front());
-        ctx->queue->pending_.pop_front();
+        rtc::CritScope lock(&me->pending_lock_);
+        RTC_DCHECK(!me->pending_.empty());
+        task = std::move(me->pending_.front());
+        me->pending_.pop_front();
         RTC_DCHECK(task.get());
       }
       if (!task->Run())
@@ -348,46 +303,38 @@
 }
 
 // static
-void TaskQueue::Impl::RunTask(int fd, short flags, void* context) {  // NOLINT
+void TaskQueueLibevent::RunTask(int fd, short flags, void* context) {  // NOLINT
   auto* task = static_cast<QueuedTask*>(context);
   if (task->Run())
     delete task;
 }
 
 // static
-void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) {  // NOLINT
+void TaskQueueLibevent::RunTimer(int fd,
+                                 short flags,  // NOLINT
+                                 void* context) {
   TimerEvent* timer = static_cast<TimerEvent*>(context);
   if (!timer->task->Run())
     timer->task.release();
-  QueueContext* ctx =
-      static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
-  ctx->pending_timers_.remove(timer);
+  timer->task_queue->pending_timers_.remove(timer);
   delete timer;
 }
 
-TaskQueue::TaskQueue(const char* queue_name, Priority priority)
-    : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
+class TaskQueueLibeventFactory final : public TaskQueueFactory {
+ public:
+  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
+      absl::string_view name,
+      Priority priority) const override {
+    return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(
+        new TaskQueueLibevent(name,
+                              TaskQueuePriorityToThreadPriority(priority)));
+  }
+};
+
+}  // namespace
+
+std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory() {
+  return absl::make_unique<TaskQueueLibeventFactory>();
 }
 
-TaskQueue::~TaskQueue() {}
-
-// static
-TaskQueue* TaskQueue::Current() {
-  return TaskQueue::Impl::CurrentQueue();
-}
-
-// Used for DCHECKing the current queue.
-bool TaskQueue::IsCurrent() const {
-  return impl_->IsCurrent();
-}
-
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
-  return TaskQueue::impl_->PostTask(std::move(task));
-}
-
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
-                                uint32_t milliseconds) {
-  return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
-}
-
-}  // namespace rtc
+}  // namespace webrtc
diff --git a/rtc_base/task_queue_libevent.h b/rtc_base/task_queue_libevent.h
new file mode 100644
index 0000000..aaa72d4
--- /dev/null
+++ b/rtc_base/task_queue_libevent.h
@@ -0,0 +1,24 @@
+/*
+ *  Copyright 2019 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef RTC_BASE_TASK_QUEUE_LIBEVENT_H_
+#define RTC_BASE_TASK_QUEUE_LIBEVENT_H_
+
+#include <memory>
+
+#include "api/task_queue/task_queue_factory.h"
+
+namespace webrtc {
+
+std::unique_ptr<TaskQueueFactory> CreateTaskQueueLibeventFactory();
+
+}  // namespace webrtc
+
+#endif  // RTC_BASE_TASK_QUEUE_LIBEVENT_H_