Reland of New task queueing primitive for async tasks: TaskQueue.

New task queueing primitive for async tasks: TaskQueue.
TaskQueue is a new way to asynchronously execute tasks sequentially
in a thread safe manner with minimal locking.  The implementation
uses OS supported APIs to do this that are compatible with async IO
notifications from things like sockets and files.
This class is a part of rtc_base_approved, so can be used by both
the webrtc and libjingle parts of the WebRTC library.  Moving forward,
we can replace rtc::Thread and webrtc::ProcessThread with this implementation.
NOTE: It should not be assumed that all tasks that execute on a TaskQueue,
run on the same thread.  E.g. on Mac and iOS, we use GCD dispatch queues
which means that tasks might execute on different threads depending on
what's the most efficient thing to do.

TBR=perkj@webrtc.org,phoglund@webrtc.org

Review-Url: https://codereview.webrtc.org/1984503002
Cr-Commit-Position: refs/heads/master@{#12749}
diff --git a/webrtc/base/BUILD.gn b/webrtc/base/BUILD.gn
index cd7e5b5..cc8b76b 100644
--- a/webrtc/base/BUILD.gn
+++ b/webrtc/base/BUILD.gn
@@ -86,6 +86,7 @@
 
 # The subset of rtc_base approved for use outside of libjingle.
 static_library("rtc_base_approved") {
+  defines = []
   deps = []
   configs += [ "..:common_config" ]
   public_configs = [ "..:common_inherited_config" ]
@@ -147,6 +148,12 @@
     "swap_queue.h",
     "systeminfo.cc",
     "systeminfo.h",
+    "task_queue.h",
+    "task_queue_gcd.cc",
+    "task_queue_libevent.cc",
+    "task_queue_posix.cc",
+    "task_queue_posix.h",
+    "task_queue_win.cc",
     "template_util.h",
     "thread_annotations.h",
     "thread_checker.h",
@@ -172,6 +179,23 @@
       "logging_mac.mm",
     ]
   }
+
+  if (!is_win && !is_mac && !is_ios && !is_nacl) {
+    deps += [ "//base/third_party/libevent" ]
+    defines += [ "WEBRTC_BUILD_LIBEVENT" ]
+  }
+
+  if (is_mac || is_ios || is_win || is_nacl) {
+    sources -= [ "task_queue_libevent.cc" ]
+  }
+
+  if (is_linux || is_android || is_win || is_nacl) {
+    sources -= [ "task_queue_gcd.cc" ]
+  }
+
+  if (is_nacl) {
+    sources -= [ "task_queue_posix.cc" ]
+  }
 }
 
 static_library("rtc_base") {
diff --git a/webrtc/base/base.gyp b/webrtc/base/base.gyp
index 9cf936a..8eb881e 100644
--- a/webrtc/base/base.gyp
+++ b/webrtc/base/base.gyp
@@ -84,6 +84,12 @@
         'swap_queue.h',
         'systeminfo.cc',
         'systeminfo.h',
+        'task_queue.h',
+        'task_queue_libevent.cc',
+        'task_queue_gcd.cc',
+        'task_queue_posix.cc',
+        'task_queue_posix.h',
+        'task_queue_win.cc',
         'template_util.h',
         'thread_annotations.h',
         'thread_checker.h',
@@ -111,6 +117,24 @@
             'logging.h',
             'logging_mac.mm',
           ],
+          'conditions': [
+            ['build_libevent==1', {
+              'dependencies': [
+                '<(DEPTH)/base/third_party/libevent/libevent.gyp:libevent',
+              ],
+            }],
+          ],
+        }],
+        ['build_libevent!=1', {
+          'sources!': [ 'task_queue_libevent.cc' ],
+          'conditions': [
+            ['OS=="linux" or OS=="android"', {
+              'sources!': [ 'task_queue_posix.cc' ],
+            }],
+          ],
+        }],
+        ['build_libevent==1 or OS=="linux" or OS=="android" or OS=="win"', {
+          'sources!': [ 'task_queue_gcd.cc' ],
         }],
         ['OS=="mac" and build_with_chromium==0', {
           'all_dependent_settings': {
diff --git a/webrtc/base/base_tests.gyp b/webrtc/base/base_tests.gyp
index bc33028..063e8e1 100644
--- a/webrtc/base/base_tests.gyp
+++ b/webrtc/base/base_tests.gyp
@@ -108,6 +108,7 @@
           'swap_queue_unittest.cc',
           # TODO(ronghuawu): Reenable this test.
           # 'systeminfo_unittest.cc',
+          'task_queue_unittest.cc',
           'task_unittest.cc',
           'testclient_unittest.cc',
           'thread_checker_unittest.cc',
diff --git a/webrtc/base/task_queue.h b/webrtc/base/task_queue.h
new file mode 100644
index 0000000..dad4f43
--- /dev/null
+++ b/webrtc/base/task_queue.h
@@ -0,0 +1,277 @@
+/*
+ *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef WEBRTC_BASE_TASK_QUEUE_H_
+#define WEBRTC_BASE_TASK_QUEUE_H_
+
+#include <list>
+#include <memory>
+
+#if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT)
+#include <dispatch/dispatch.h>
+#endif
+
+#include "webrtc/base/constructormagic.h"
+#include "webrtc/base/criticalsection.h"
+
+#if defined(WEBRTC_WIN) || defined(WEBRTC_BUILD_LIBEVENT)
+#include "webrtc/base/platform_thread.h"
+#endif
+
+#if defined(WEBRTC_BUILD_LIBEVENT)
+struct event_base;
+struct event;
+#endif
+
+namespace rtc {
+
+// Base interface for asynchronously executed tasks.
+// The interface basically consists of a single function, Run(), that executes
+// on the target queue.  For more details see the Run() method and TaskQueue.
+class QueuedTask {
+ public:
+  QueuedTask() {}
+  virtual ~QueuedTask() {}
+
+  // Main routine that will run when the task is executed on the desired queue.
+  // The task should return |true| to indicate that it should be deleted or
+  // |false| to indicate that the queue should consider ownership of the task
+  // having been transferred.  Returning |false| can be useful if a task has
+  // re-posted itself to a different queue or is otherwise being re-used.
+  virtual bool Run() = 0;
+
+ private:
+  RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask);
+};
+
+// Simple implementation of QueuedTask for use with rtc::Bind and lambdas.
+template <class Closure>
+class ClosureTask : public QueuedTask {
+ public:
+  explicit ClosureTask(const Closure& closure) : closure_(closure) {}
+
+ private:
+  bool Run() override {
+    closure_();
+    return true;
+  }
+
+  Closure closure_;
+};
+
+// Extends ClosureTask to also allow specifying cleanup code.
+// This is useful when using lambdas if guaranteeing cleanup, even if a task
+// was dropped (queue is too full), is required.
+template <class Closure, class Cleanup>
+class ClosureTaskWithCleanup : public ClosureTask<Closure> {
+ public:
+  ClosureTaskWithCleanup(const Closure& closure, Cleanup cleanup)
+      : ClosureTask<Closure>(closure), cleanup_(cleanup) {}
+  ~ClosureTaskWithCleanup() { cleanup_(); }
+
+ private:
+  Cleanup cleanup_;
+};
+
+// Convenience function to construct closures that can be passed directly
+// to methods that support std::unique_ptr<QueuedTask> but not template
+// based parameters.
+template <class Closure>
+static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure) {
+  return std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure));
+}
+
+template <class Closure, class Cleanup>
+static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure,
+                                              const Cleanup& cleanup) {
+  return std::unique_ptr<QueuedTask>(
+      new ClosureTaskWithCleanup<Closure, Cleanup>(closure, cleanup));
+}
+
+// Implements a task queue that asynchronously executes tasks in a way that
+// guarantees that they're executed in FIFO order and that tasks never overlap.
+// Tasks may always execute on the same worker thread and they may not.
+// To DCHECK that tasks are executing on a known task queue, use IsCurrent().
+//
+// Here are some usage examples:
+//
+//   1) Asynchronously running a lambda:
+//
+//     class MyClass {
+//       ...
+//       TaskQueue queue_("MyQueue");
+//     };
+//
+//     void MyClass::StartWork() {
+//       queue_.PostTask([]() { Work(); });
+//     ...
+//
+//   2) Doing work asynchronously on a worker queue and providing a notification
+//      callback on the current queue, when the work has been done:
+//
+//     void MyClass::StartWorkAndLetMeKnowWhenDone(
+//         std::unique_ptr<QueuedTask> callback) {
+//       DCHECK(TaskQueue::Current()) << "Need to be running on a queue";
+//       queue_.PostTaskAndReply([]() { Work(); }, std::move(callback));
+//     }
+//     ...
+//     my_class->StartWorkAndLetMeKnowWhenDone(
+//         NewClosure([]() { LOG(INFO) << "The work is done!";}));
+//
+//   3) Posting a custom task on a timer.  The task posts itself again after
+//      every running:
+//
+//     class TimerTask : public QueuedTask {
+//      public:
+//       TimerTask() {}
+//      private:
+//       bool Run() override {
+//         ++count_;
+//         TaskQueue::Current()->PostDelayedTask(
+//             std::unique_ptr<QueuedTask>(this), 1000);
+//         // Ownership has been transferred to the next occurance,
+//         // so return false to prevent from being deleted now.
+//         return false;
+//       }
+//       int count_ = 0;
+//     };
+//     ...
+//     queue_.PostDelayedTask(
+//         std::unique_ptr<QueuedTask>(new TimerTask()), 1000);
+//
+// For more examples, see task_queue_unittests.cc.
+//
+// A note on destruction:
+//
+// When a TaskQueue is deleted, pending tasks will not be executed but they will
+// be deleted.  The deletion of tasks may happen asynchronously after the
+// TaskQueue itself has been deleted or it may happen synchronously while the
+// TaskQueue instance is being deleted.  This may vary from one OS to the next
+// so assumptions about lifetimes of pending tasks should not be made.
+class TaskQueue {
+ public:
+  explicit TaskQueue(const char* queue_name);
+  // TODO(tommi): Implement move semantics?
+  ~TaskQueue();
+
+  static TaskQueue* Current();
+
+  // Used for DCHECKing the current queue.
+  static bool IsCurrent(const char* queue_name);
+  bool IsCurrent() const;
+
+  // TODO(tommi): For better debuggability, implement FROM_HERE.
+
+  // Ownership of the task is passed to PostTask.
+  void PostTask(std::unique_ptr<QueuedTask> task);
+  void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                        std::unique_ptr<QueuedTask> reply,
+                        TaskQueue* reply_queue);
+  void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                        std::unique_ptr<QueuedTask> reply);
+
+  void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
+
+  template <class Closure>
+  void PostTask(const Closure& closure) {
+    PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)));
+  }
+
+  template <class Closure>
+  void PostDelayedTask(const Closure& closure, uint32_t milliseconds) {
+    PostDelayedTask(
+        std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)),
+        milliseconds);
+  }
+
+  template <class Closure1, class Closure2>
+  void PostTaskAndReply(const Closure1& task,
+                        const Closure2& reply,
+                        TaskQueue* reply_queue) {
+    PostTaskAndReply(
+        std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
+        std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)),
+        reply_queue);
+  }
+
+  template <class Closure>
+  void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                        const Closure& reply) {
+    PostTaskAndReply(std::move(task), std::unique_ptr<QueuedTask>(
+                                          new ClosureTask<Closure>(reply)));
+  }
+
+  template <class Closure>
+  void PostTaskAndReply(const Closure& task,
+                        std::unique_ptr<QueuedTask> reply) {
+    PostTaskAndReply(
+        std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(task)),
+        std::move(reply));
+  }
+
+  template <class Closure1, class Closure2>
+  void PostTaskAndReply(const Closure1& task, const Closure2& reply) {
+    PostTaskAndReply(
+        std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
+        std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)));
+  }
+
+ private:
+#if defined(WEBRTC_BUILD_LIBEVENT)
+  static bool ThreadMain(void* context);
+  static void OnWakeup(int socket, short flags, void* context);  // NOLINT
+  static void RunTask(int fd, short flags, void* context);       // NOLINT
+  static void RunTimer(int fd, short flags, void* context);      // NOLINT
+
+  class PostAndReplyTask;
+  class SetTimerTask;
+
+  void PrepareReplyTask(PostAndReplyTask* reply_task);
+  void ReplyTaskDone(PostAndReplyTask* reply_task);
+
+  struct QueueContext;
+
+  int wakeup_pipe_in_ = -1;
+  int wakeup_pipe_out_ = -1;
+  event_base* event_base_;
+  std::unique_ptr<event> wakeup_event_;
+  PlatformThread thread_;
+  rtc::CriticalSection pending_lock_;
+  std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
+  std::list<PostAndReplyTask*> pending_replies_ GUARDED_BY(pending_lock_);
+#elif defined(WEBRTC_MAC)
+  struct QueueContext;
+  struct TaskContext;
+  struct PostTaskAndReplyContext;
+  dispatch_queue_t queue_;
+  QueueContext* const context_;
+#elif defined(WEBRTC_WIN)
+  static bool ThreadMain(void* context);
+
+  class WorkerThread : public PlatformThread {
+   public:
+    WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name)
+        : PlatformThread(func, obj, thread_name) {}
+
+    bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
+      return PlatformThread::QueueAPC(apc_function, data);
+    }
+  };
+  WorkerThread thread_;
+#else
+#error not supported.
+#endif
+
+  RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);
+};
+
+}  // namespace rtc
+
+#endif  // WEBRTC_BASE_TASK_QUEUE_H_
diff --git a/webrtc/base/task_queue_gcd.cc b/webrtc/base/task_queue_gcd.cc
new file mode 100644
index 0000000..2c7d649
--- /dev/null
+++ b/webrtc/base/task_queue_gcd.cc
@@ -0,0 +1,167 @@
+/*
+ *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+// This file contains the implementation of TaskQueue for Mac and iOS.
+// The implementation uses Grand Central Dispatch queues (GCD) to
+// do the actual task queuing.
+
+#include "webrtc/base/task_queue.h"
+
+#include <string.h>
+
+#include "webrtc/base/checks.h"
+#include "webrtc/base/logging.h"
+#include "webrtc/base/task_queue_posix.h"
+
+namespace rtc {
+using internal::GetQueuePtrTls;
+using internal::AutoSetCurrentQueuePtr;
+
+struct TaskQueue::QueueContext {
+  explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
+
+  static void SetNotActive(void* context) {
+    QueueContext* qc = static_cast<QueueContext*>(context);
+    qc->is_active = false;
+  }
+
+  static void DeleteContext(void* context) {
+    QueueContext* qc = static_cast<QueueContext*>(context);
+    delete qc;
+  }
+
+  TaskQueue* const queue;
+  bool is_active;
+};
+
+struct TaskQueue::TaskContext {
+  TaskContext(QueueContext* queue_ctx, std::unique_ptr<QueuedTask> task)
+      : queue_ctx(queue_ctx), task(std::move(task)) {}
+  virtual ~TaskContext() {}
+
+  static void RunTask(void* context) {
+    std::unique_ptr<TaskContext> tc(static_cast<TaskContext*>(context));
+    if (tc->queue_ctx->is_active) {
+      AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue);
+      if (!tc->task->Run())
+        tc->task.release();
+    }
+  }
+
+  QueueContext* const queue_ctx;
+  std::unique_ptr<QueuedTask> task;
+};
+
+// Special case context for holding two tasks, a |first_task| + the task
+// that's owned by the parent struct, TaskContext, that then becomes the
+// second (i.e. 'reply') task.
+struct TaskQueue::PostTaskAndReplyContext : public TaskQueue::TaskContext {
+  explicit PostTaskAndReplyContext(QueueContext* first_queue_ctx,
+                                   std::unique_ptr<QueuedTask> first_task,
+                                   QueueContext* second_queue_ctx,
+                                   std::unique_ptr<QueuedTask> second_task)
+      : TaskContext(second_queue_ctx, std::move(second_task)),
+        first_queue_ctx(first_queue_ctx),
+        first_task(std::move(first_task)) {
+    // Retain the reply queue for as long as this object lives.
+    // If we don't, we may have memory leaks and/or failures.
+    dispatch_retain(first_queue_ctx->queue->queue_);
+  }
+  ~PostTaskAndReplyContext() override {
+    dispatch_release(first_queue_ctx->queue->queue_);
+  }
+
+  static void RunTask(void* context) {
+    auto* rc = static_cast<PostTaskAndReplyContext*>(context);
+    if (rc->first_queue_ctx->is_active) {
+      AutoSetCurrentQueuePtr set_current(rc->first_queue_ctx->queue);
+      if (!rc->first_task->Run())
+        rc->first_task.release();
+    }
+    // Post the reply task.  This hands the work over to the parent struct.
+    // This task will eventually delete |this|.
+    dispatch_async_f(rc->queue_ctx->queue->queue_, rc, &TaskContext::RunTask);
+  }
+
+  QueueContext* const first_queue_ctx;
+  std::unique_ptr<QueuedTask> first_task;
+};
+
+TaskQueue::TaskQueue(const char* queue_name)
+    : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)),
+      context_(new QueueContext(this)) {
+  RTC_DCHECK(queue_name);
+  RTC_CHECK(queue_);
+  dispatch_set_context(queue_, context_);
+  // Assign a finalizer that will delete the context when the last reference
+  // to the queue is released.  This may run after the TaskQueue object has
+  // been deleted.
+  dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext);
+}
+
+TaskQueue::~TaskQueue() {
+  RTC_DCHECK(!IsCurrent());
+  // Implementation/behavioral note:
+  // Dispatch queues are reference counted via calls to dispatch_retain and
+  // dispatch_release. Pending blocks submitted to a queue also hold a
+  // reference to the queue until they have finished. Once all references to a
+  // queue have been released, the queue will be deallocated by the system.
+  // This is why we check the context before running tasks.
+
+  // Use dispatch_sync to set the context to null to guarantee that there's not
+  // a race between checking the context and using it from a task.
+  dispatch_sync_f(queue_, context_, &QueueContext::SetNotActive);
+  dispatch_release(queue_);
+}
+
+// static
+TaskQueue* TaskQueue::Current() {
+  return static_cast<TaskQueue*>(pthread_getspecific(GetQueuePtrTls()));
+}
+
+// static
+bool TaskQueue::IsCurrent(const char* queue_name) {
+  TaskQueue* current = Current();
+  return current &&
+         strcmp(queue_name, dispatch_queue_get_label(current->queue_)) == 0;
+}
+
+bool TaskQueue::IsCurrent() const {
+  RTC_DCHECK(queue_);
+  return this == Current();
+}
+
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+  auto* context = new TaskContext(context_, std::move(task));
+  dispatch_async_f(queue_, context, &TaskContext::RunTask);
+}
+
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                                uint32_t milliseconds) {
+  auto* context = new TaskContext(context_, std::move(task));
+  dispatch_after_f(
+      dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_MSEC), queue_,
+      context, &TaskContext::RunTask);
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                 std::unique_ptr<QueuedTask> reply,
+                                 TaskQueue* reply_queue) {
+  auto* context = new PostTaskAndReplyContext(
+      context_, std::move(task), reply_queue->context_, std::move(reply));
+  dispatch_async_f(queue_, context, &PostTaskAndReplyContext::RunTask);
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                 std::unique_ptr<QueuedTask> reply) {
+  return PostTaskAndReply(std::move(task), std::move(reply), Current());
+}
+
+}  // namespace rtc
diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
new file mode 100644
index 0000000..a59b450
--- /dev/null
+++ b/webrtc/base/task_queue_libevent.cc
@@ -0,0 +1,318 @@
+/*
+ *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/base/task_queue.h"
+
+#include <fcntl.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "base/third_party/libevent/event.h"
+#include "webrtc/base/checks.h"
+#include "webrtc/base/logging.h"
+#include "webrtc/base/task_queue_posix.h"
+#include "webrtc/base/timeutils.h"
+
+namespace rtc {
+using internal::GetQueuePtrTls;
+using internal::AutoSetCurrentQueuePtr;
+
+namespace {
+static const char kQuit = 1;
+static const char kRunTask = 2;
+
+struct TimerEvent {
+  explicit TimerEvent(std::unique_ptr<QueuedTask> task)
+      : task(std::move(task)) {}
+  ~TimerEvent() { event_del(&ev); }
+  event ev;
+  std::unique_ptr<QueuedTask> task;
+};
+
+bool SetNonBlocking(int fd) {
+  const int flags = fcntl(fd, F_GETFL);
+  RTC_CHECK(flags != -1);
+  return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
+}
+}  // namespace
+
+struct TaskQueue::QueueContext {
+  explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
+  TaskQueue* queue;
+  bool is_active;
+  // Holds a list of events pending timers for cleanup when the loop exits.
+  std::list<TimerEvent*> pending_timers_;
+};
+
+class TaskQueue::PostAndReplyTask : public QueuedTask {
+ public:
+  PostAndReplyTask(std::unique_ptr<QueuedTask> task,
+                   std::unique_ptr<QueuedTask> reply,
+                   TaskQueue* reply_queue)
+      : task_(std::move(task)),
+        reply_(std::move(reply)),
+        reply_queue_(reply_queue) {
+    reply_queue->PrepareReplyTask(this);
+  }
+
+  ~PostAndReplyTask() override {
+    CritScope lock(&lock_);
+    if (reply_queue_)
+      reply_queue_->ReplyTaskDone(this);
+  }
+
+  void OnReplyQueueGone() {
+    CritScope lock(&lock_);
+    reply_queue_ = nullptr;
+  }
+
+ private:
+  bool Run() override {
+    if (!task_->Run())
+      task_.release();
+
+    CritScope lock(&lock_);
+    if (reply_queue_)
+      reply_queue_->PostTask(std::move(reply_));
+    return true;
+  }
+
+  CriticalSection lock_;
+  std::unique_ptr<QueuedTask> task_;
+  std::unique_ptr<QueuedTask> reply_;
+  TaskQueue* reply_queue_ GUARDED_BY(lock_);
+};
+
+class TaskQueue::SetTimerTask : public QueuedTask {
+ public:
+  SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
+      : task_(std::move(task)),
+        milliseconds_(milliseconds),
+        posted_(Time32()) {}
+
+ private:
+  bool Run() override {
+    // Compensate for the time that has passed since construction
+    // and until we got here.
+    uint32_t post_time = Time32() - posted_;
+    TaskQueue::Current()->PostDelayedTask(
+        std::move(task_),
+        post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
+    return true;
+  }
+
+  std::unique_ptr<QueuedTask> task_;
+  const uint32_t milliseconds_;
+  const uint32_t posted_;
+};
+
+TaskQueue::TaskQueue(const char* queue_name)
+    : event_base_(event_base_new()),
+      wakeup_event_(new event()),
+      thread_(&TaskQueue::ThreadMain, this, queue_name) {
+  RTC_DCHECK(queue_name);
+  int fds[2];
+  RTC_CHECK(pipe(fds) == 0);
+  SetNonBlocking(fds[0]);
+  SetNonBlocking(fds[1]);
+  wakeup_pipe_out_ = fds[0];
+  wakeup_pipe_in_ = fds[1];
+  event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
+            OnWakeup, this);
+  event_base_set(event_base_, wakeup_event_.get());
+  event_add(wakeup_event_.get(), 0);
+  thread_.Start();
+}
+
+TaskQueue::~TaskQueue() {
+  RTC_DCHECK(!IsCurrent());
+  struct timespec ts;
+  char message = kQuit;
+  while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
+    // The queue is full, so we have no choice but to wait and retry.
+    RTC_CHECK_EQ(EAGAIN, errno);
+    ts.tv_sec = 0;
+    ts.tv_nsec = 1000000;
+    nanosleep(&ts, nullptr);
+  }
+
+  thread_.Stop();
+
+  event_del(wakeup_event_.get());
+  close(wakeup_pipe_in_);
+  close(wakeup_pipe_out_);
+  wakeup_pipe_in_ = -1;
+  wakeup_pipe_out_ = -1;
+
+  {
+    // Synchronize against any pending reply tasks that might be running on
+    // other queues.
+    CritScope lock(&pending_lock_);
+    for (auto* reply : pending_replies_)
+      reply->OnReplyQueueGone();
+    pending_replies_.clear();
+  }
+
+  event_base_free(event_base_);
+}
+
+// static
+TaskQueue* TaskQueue::Current() {
+  QueueContext* ctx =
+      static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+  return ctx ? ctx->queue : nullptr;
+}
+
+// static
+bool TaskQueue::IsCurrent(const char* queue_name) {
+  TaskQueue* current = Current();
+  return current && current->thread_.name().compare(queue_name) == 0;
+}
+
+bool TaskQueue::IsCurrent() const {
+  return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
+}
+
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+  RTC_DCHECK(task.get());
+  // libevent isn't thread safe.  This means that we can't use methods such
+  // as event_base_once to post tasks to the worker thread from a different
+  // thread.  However, we can use it when posting from the worker thread itself.
+  if (IsCurrent()) {
+    if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
+                        task.get(), nullptr) == 0) {
+      task.release();
+    }
+  } else {
+    QueuedTask* task_id = task.get();  // Only used for comparison.
+    {
+      CritScope lock(&pending_lock_);
+      pending_.push_back(std::move(task));
+    }
+    char message = kRunTask;
+    if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
+      LOG(WARNING) << "Failed to queue task.";
+      CritScope lock(&pending_lock_);
+      pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
+        return t.get() == task_id;
+      });
+    }
+  }
+}
+
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                                uint32_t milliseconds) {
+  if (IsCurrent()) {
+    TimerEvent* timer = new TimerEvent(std::move(task));
+    evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer);
+    event_base_set(event_base_, &timer->ev);
+    QueueContext* ctx =
+        static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+    ctx->pending_timers_.push_back(timer);
+    timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
+    event_add(&timer->ev, &tv);
+  } else {
+    PostTask(std::unique_ptr<QueuedTask>(
+        new SetTimerTask(std::move(task), milliseconds)));
+  }
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                 std::unique_ptr<QueuedTask> reply,
+                                 TaskQueue* reply_queue) {
+  std::unique_ptr<QueuedTask> wrapper_task(
+      new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
+  PostTask(std::move(wrapper_task));
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                 std::unique_ptr<QueuedTask> reply) {
+  return PostTaskAndReply(std::move(task), std::move(reply), Current());
+}
+
+// static
+bool TaskQueue::ThreadMain(void* context) {
+  TaskQueue* me = static_cast<TaskQueue*>(context);
+
+  QueueContext queue_context(me);
+  pthread_setspecific(GetQueuePtrTls(), &queue_context);
+
+  while (queue_context.is_active)
+    event_base_loop(me->event_base_, 0);
+
+  pthread_setspecific(GetQueuePtrTls(), nullptr);
+
+  for (TimerEvent* timer : queue_context.pending_timers_)
+    delete timer;
+
+  return false;
+}
+
+// static
+void TaskQueue::OnWakeup(int socket, short flags, void* context) {  // NOLINT
+  QueueContext* ctx =
+      static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+  RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
+  char buf;
+  RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
+  switch (buf) {
+    case kQuit:
+      ctx->is_active = false;
+      event_base_loopbreak(ctx->queue->event_base_);
+      break;
+    case kRunTask: {
+      std::unique_ptr<QueuedTask> task;
+      {
+        CritScope lock(&ctx->queue->pending_lock_);
+        RTC_DCHECK(!ctx->queue->pending_.empty());
+        task = std::move(ctx->queue->pending_.front());
+        ctx->queue->pending_.pop_front();
+        RTC_DCHECK(task.get());
+      }
+      if (!task->Run())
+        task.release();
+      break;
+    }
+    default:
+      RTC_NOTREACHED();
+      break;
+  }
+}
+
+// static
+void TaskQueue::RunTask(int fd, short flags, void* context) {  // NOLINT
+  auto* task = static_cast<QueuedTask*>(context);
+  if (task->Run())
+    delete task;
+}
+
+// static
+void TaskQueue::RunTimer(int fd, short flags, void* context) {  // NOLINT
+  TimerEvent* timer = static_cast<TimerEvent*>(context);
+  if (!timer->task->Run())
+    timer->task.release();
+  QueueContext* ctx =
+      static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
+  ctx->pending_timers_.remove(timer);
+  delete timer;
+}
+
+void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
+  RTC_DCHECK(reply_task);
+  CritScope lock(&pending_lock_);
+  pending_replies_.push_back(reply_task);
+}
+
+void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
+  CritScope lock(&pending_lock_);
+  pending_replies_.remove(reply_task);
+}
+
+}  // namespace rtc
diff --git a/webrtc/base/task_queue_posix.cc b/webrtc/base/task_queue_posix.cc
new file mode 100644
index 0000000..3b00ac8
--- /dev/null
+++ b/webrtc/base/task_queue_posix.cc
@@ -0,0 +1,40 @@
+/*
+ *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/base/task_queue_posix.h"
+
+#include "webrtc/base/checks.h"
+#include "webrtc/base/task_queue.h"
+
+namespace rtc {
+namespace internal {
+pthread_key_t g_queue_ptr_tls = 0;
+
+void InitializeTls() {
+  RTC_CHECK(pthread_key_create(&g_queue_ptr_tls, nullptr) == 0);
+}
+
+pthread_key_t GetQueuePtrTls() {
+  static pthread_once_t init_once = PTHREAD_ONCE_INIT;
+  RTC_CHECK(pthread_once(&init_once, &InitializeTls) == 0);
+  return g_queue_ptr_tls;
+}
+
+AutoSetCurrentQueuePtr::AutoSetCurrentQueuePtr(TaskQueue* q)
+    : prev_(TaskQueue::Current()) {
+  pthread_setspecific(GetQueuePtrTls(), q);
+}
+
+AutoSetCurrentQueuePtr::~AutoSetCurrentQueuePtr() {
+  pthread_setspecific(GetQueuePtrTls(), prev_);
+}
+
+}  // namespace internal
+}  // namespace rtc
diff --git a/webrtc/base/task_queue_posix.h b/webrtc/base/task_queue_posix.h
new file mode 100644
index 0000000..b677b78
--- /dev/null
+++ b/webrtc/base/task_queue_posix.h
@@ -0,0 +1,36 @@
+/*
+ *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef WEBRTC_BASE_TASK_QUEUE_POSIX_H_
+#define WEBRTC_BASE_TASK_QUEUE_POSIX_H_
+
+#include <pthread.h>
+
+namespace rtc {
+
+class TaskQueue;
+
+namespace internal {
+
+class AutoSetCurrentQueuePtr {
+ public:
+  explicit AutoSetCurrentQueuePtr(TaskQueue* q);
+  ~AutoSetCurrentQueuePtr();
+
+ private:
+  TaskQueue* const prev_;
+};
+
+pthread_key_t GetQueuePtrTls();
+
+}  // namespace internal
+}  // namespace rtc
+
+#endif  // WEBRTC_BASE_TASK_QUEUE_POSIX_H_
diff --git a/webrtc/base/task_queue_unittest.cc b/webrtc/base/task_queue_unittest.cc
new file mode 100644
index 0000000..db4e6c2
--- /dev/null
+++ b/webrtc/base/task_queue_unittest.cc
@@ -0,0 +1,261 @@
+/*
+ *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include <memory>
+#include <vector>
+
+#include "webrtc/base/bind.h"
+#include "webrtc/base/event.h"
+#include "webrtc/base/gunit.h"
+#include "webrtc/base/task_queue.h"
+#include "webrtc/base/timeutils.h"
+
+namespace rtc {
+
+namespace {
+void CheckCurrent(const char* expected_queue, Event* signal, TaskQueue* queue) {
+  EXPECT_TRUE(TaskQueue::IsCurrent(expected_queue));
+  EXPECT_TRUE(queue->IsCurrent());
+  if (signal)
+    signal->Set();
+}
+
+}  // namespace
+
+TEST(TaskQueueTest, Construct) {
+  static const char kQueueName[] = "Construct";
+  TaskQueue queue(kQueueName);
+  EXPECT_FALSE(queue.IsCurrent());
+}
+
+TEST(TaskQueueTest, PostAndCheckCurrent) {
+  static const char kQueueName[] = "PostAndCheckCurrent";
+  TaskQueue queue(kQueueName);
+
+  // We're not running a task, so there shouldn't be a current queue.
+  EXPECT_FALSE(queue.IsCurrent());
+  EXPECT_FALSE(TaskQueue::Current());
+
+  Event event(false, false);
+  queue.PostTask(Bind(&CheckCurrent, kQueueName, &event, &queue));
+  EXPECT_TRUE(event.Wait(1000));
+}
+
+TEST(TaskQueueTest, PostCustomTask) {
+  static const char kQueueName[] = "PostCustomImplementation";
+  TaskQueue queue(kQueueName);
+
+  Event event(false, false);
+
+  class CustomTask : public QueuedTask {
+   public:
+    explicit CustomTask(Event* event) : event_(event) {}
+
+   private:
+    bool Run() override {
+      event_->Set();
+      return false;  // Never allows the task to be deleted by the queue.
+    }
+
+    Event* const event_;
+  } my_task(&event);
+
+  // Please don't do this in production code! :)
+  queue.PostTask(std::unique_ptr<QueuedTask>(&my_task));
+  EXPECT_TRUE(event.Wait(1000));
+}
+
+TEST(TaskQueueTest, PostLambda) {
+  static const char kQueueName[] = "PostLambda";
+  TaskQueue queue(kQueueName);
+
+  Event event(false, false);
+  queue.PostTask([&event]() { event.Set(); });
+  EXPECT_TRUE(event.Wait(1000));
+}
+
+TEST(TaskQueueTest, PostFromQueue) {
+  static const char kQueueName[] = "PostFromQueue";
+  TaskQueue queue(kQueueName);
+
+  Event event(false, false);
+  queue.PostTask(
+      [&event, &queue]() { queue.PostTask([&event]() { event.Set(); }); });
+  EXPECT_TRUE(event.Wait(1000));
+}
+
+TEST(TaskQueueTest, PostDelayed) {
+  static const char kQueueName[] = "PostDelayed";
+  TaskQueue queue(kQueueName);
+
+  Event event(false, false);
+  uint32_t start = Time();
+  queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100);
+  EXPECT_TRUE(event.Wait(1000));
+  uint32_t end = Time();
+  EXPECT_GE(end - start, 100u);
+  EXPECT_NEAR(end - start, 200u, 100u);  // Accept 100-300.
+}
+
+TEST(TaskQueueTest, PostMultipleDelayed) {
+  static const char kQueueName[] = "PostMultipleDelayed";
+  TaskQueue queue(kQueueName);
+
+  std::vector<std::unique_ptr<Event>> events;
+  for (int i = 0; i < 10; ++i) {
+    events.push_back(std::unique_ptr<Event>(new Event(false, false)));
+    queue.PostDelayedTask(
+        Bind(&CheckCurrent, kQueueName, events.back().get(), &queue), 10);
+  }
+
+  for (const auto& e : events)
+    EXPECT_TRUE(e->Wait(100));
+}
+
+TEST(TaskQueueTest, PostDelayedAfterDestruct) {
+  static const char kQueueName[] = "PostDelayedAfterDestruct";
+  Event event(false, false);
+  {
+    TaskQueue queue(kQueueName);
+    queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100);
+  }
+  EXPECT_FALSE(event.Wait(200));  // Task should not run.
+}
+
+TEST(TaskQueueTest, PostAndReply) {
+  static const char kPostQueue[] = "PostQueue";
+  static const char kReplyQueue[] = "ReplyQueue";
+  TaskQueue post_queue(kPostQueue);
+  TaskQueue reply_queue(kReplyQueue);
+
+  Event event(false, false);
+  post_queue.PostTaskAndReply(
+      Bind(&CheckCurrent, kPostQueue, nullptr, &post_queue),
+      Bind(&CheckCurrent, kReplyQueue, &event, &reply_queue), &reply_queue);
+  EXPECT_TRUE(event.Wait(1000));
+}
+
+TEST(TaskQueueTest, PostAndReuse) {
+  static const char kPostQueue[] = "PostQueue";
+  static const char kReplyQueue[] = "ReplyQueue";
+  TaskQueue post_queue(kPostQueue);
+  TaskQueue reply_queue(kReplyQueue);
+
+  int call_count = 0;
+
+  class ReusedTask : public QueuedTask {
+   public:
+    ReusedTask(int* counter, TaskQueue* reply_queue, Event* event)
+        : counter_(counter), reply_queue_(reply_queue), event_(event) {
+      EXPECT_EQ(0, *counter_);
+    }
+
+   private:
+    bool Run() override {
+      if (++(*counter_) == 1) {
+        std::unique_ptr<QueuedTask> myself(this);
+        reply_queue_->PostTask(std::move(myself));
+        // At this point, the object is owned by reply_queue_ and it's
+        // theoratically possible that the object has been deleted (e.g. if
+        // posting wasn't possible).  So, don't touch any member variables here.
+
+        // Indicate to the current queue that ownership has been transferred.
+        return false;
+      } else {
+        EXPECT_EQ(2, *counter_);
+        EXPECT_TRUE(reply_queue_->IsCurrent());
+        event_->Set();
+        return true;  // Indicate that the object should be deleted.
+      }
+    }
+
+    int* const counter_;
+    TaskQueue* const reply_queue_;
+    Event* const event_;
+  };
+
+  Event event(false, false);
+  std::unique_ptr<QueuedTask> task(
+      new ReusedTask(&call_count, &reply_queue, &event));
+
+  post_queue.PostTask(std::move(task));
+  EXPECT_TRUE(event.Wait(1000));
+}
+
+TEST(TaskQueueTest, PostAndReplyLambda) {
+  static const char kPostQueue[] = "PostQueue";
+  static const char kReplyQueue[] = "ReplyQueue";
+  TaskQueue post_queue(kPostQueue);
+  TaskQueue reply_queue(kReplyQueue);
+
+  Event event(false, false);
+  bool my_flag = false;
+  post_queue.PostTaskAndReply([&my_flag]() { my_flag = true; },
+                              [&event]() { event.Set(); }, &reply_queue);
+  EXPECT_TRUE(event.Wait(1000));
+  EXPECT_TRUE(my_flag);
+}
+
+void TestPostTaskAndReply(TaskQueue* work_queue,
+                          const char* work_queue_name,
+                          Event* event) {
+  ASSERT_FALSE(work_queue->IsCurrent());
+  work_queue->PostTaskAndReply(
+      Bind(&CheckCurrent, work_queue_name, nullptr, work_queue),
+      NewClosure([event]() { event->Set(); }));
+}
+
+// Does a PostTaskAndReply from within a task to post and reply to the current
+// queue.  All in all there will be 3 tasks posted and run.
+TEST(TaskQueueTest, PostAndReply2) {
+  static const char kQueueName[] = "PostAndReply2";
+  static const char kWorkQueueName[] = "PostAndReply2_Worker";
+  TaskQueue queue(kQueueName);
+  TaskQueue work_queue(kWorkQueueName);
+
+  Event event(false, false);
+  queue.PostTask(
+      Bind(&TestPostTaskAndReply, &work_queue, kWorkQueueName, &event));
+  EXPECT_TRUE(event.Wait(1000));
+}
+
+// Tests posting more messages than a queue can queue up.
+// In situations like that, tasks will get dropped.
+TEST(TaskQueueTest, PostALot) {
+  // To destruct the event after the queue has gone out of scope.
+  Event event(false, false);
+
+  int tasks_executed = 0;
+  int tasks_cleaned_up = 0;
+  static const int kTaskCount = 0xffff;
+
+  {
+    static const char kQueueName[] = "PostALot";
+    TaskQueue queue(kQueueName);
+
+    // On linux, the limit of pending bytes in the pipe buffer is 0xffff.
+    // So here we post a total of 0xffff+1 messages, which triggers a failure
+    // case inside of the libevent queue implementation.
+
+    queue.PostTask([&event]() { event.Wait(Event::kForever); });
+    for (int i = 0; i < kTaskCount; ++i)
+      queue.PostTask(NewClosure([&tasks_executed]() { ++tasks_executed; },
+                                [&tasks_cleaned_up]() { ++tasks_cleaned_up; }));
+    event.Set();  // Unblock the first task.
+  }
+
+  EXPECT_GE(tasks_cleaned_up, tasks_executed);
+  EXPECT_EQ(kTaskCount, tasks_cleaned_up);
+
+  LOG(INFO) << "tasks executed: " << tasks_executed
+            << ", tasks cleaned up: " << tasks_cleaned_up;
+}
+
+}  // namespace rtc
diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc
new file mode 100644
index 0000000..5ae6d92
--- /dev/null
+++ b/webrtc/base/task_queue_win.cc
@@ -0,0 +1,184 @@
+/*
+ *  Copyright 2016 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/base/task_queue.h"
+
+#include <string.h>
+#include <unordered_map>
+
+#include "webrtc/base/checks.h"
+#include "webrtc/base/logging.h"
+
+namespace rtc {
+namespace {
+#define WM_RUN_TASK WM_USER + 1
+#define WM_QUEUE_DELAYED_TASK WM_USER + 2
+
+DWORD g_queue_ptr_tls = 0;
+
+BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
+  g_queue_ptr_tls = TlsAlloc();
+  return TRUE;
+}
+
+DWORD GetQueuePtrTls() {
+  static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
+  InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
+  return g_queue_ptr_tls;
+}
+
+struct ThreadStartupData {
+  Event* started;
+  void* thread_context;
+};
+
+void CALLBACK InitializeQueueThread(ULONG_PTR param) {
+  MSG msg;
+  PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
+  ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
+  TlsSetValue(GetQueuePtrTls(), data->thread_context);
+  data->started->Set();
+}
+}  // namespace
+
+TaskQueue::TaskQueue(const char* queue_name)
+    : thread_(&TaskQueue::ThreadMain, this, queue_name) {
+  RTC_DCHECK(queue_name);
+  thread_.Start();
+  Event event(false, false);
+  ThreadStartupData startup = {&event, this};
+  RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
+                             reinterpret_cast<ULONG_PTR>(&startup)));
+  event.Wait(Event::kForever);
+}
+
+TaskQueue::~TaskQueue() {
+  RTC_DCHECK(!IsCurrent());
+  while (!PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
+    RTC_CHECK(ERROR_NOT_ENOUGH_QUOTA == ::GetLastError());
+    Sleep(1);
+  }
+  thread_.Stop();
+}
+
+// static
+TaskQueue* TaskQueue::Current() {
+  return static_cast<TaskQueue*>(TlsGetValue(GetQueuePtrTls()));
+}
+
+// static
+bool TaskQueue::IsCurrent(const char* queue_name) {
+  TaskQueue* current = Current();
+  return current && current->thread_.name().compare(queue_name) == 0;
+}
+
+bool TaskQueue::IsCurrent() const {
+  return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
+}
+
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+  if (PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
+                        reinterpret_cast<LPARAM>(task.get()))) {
+    task.release();
+  }
+}
+
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+                                uint32_t milliseconds) {
+  WPARAM wparam;
+#if defined(_WIN64)
+  // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms)
+  // so this compensation isn't that accurate, but since we have unused 32 bits
+  // on Win64, we might as well use them.
+  wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds;
+#else
+  wparam = milliseconds;
+#endif
+  if (PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam,
+                        reinterpret_cast<LPARAM>(task.get()))) {
+    task.release();
+  }
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                 std::unique_ptr<QueuedTask> reply,
+                                 TaskQueue* reply_queue) {
+  QueuedTask* task_ptr = task.release();
+  QueuedTask* reply_task_ptr = reply.release();
+  DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
+  PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
+    if (task_ptr->Run())
+      delete task_ptr;
+    // If the thread's message queue is full, we can't queue the task and will
+    // have to drop it (i.e. delete).
+    if (!PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
+                           reinterpret_cast<LPARAM>(reply_task_ptr))) {
+      delete reply_task_ptr;
+    }
+  });
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+                                 std::unique_ptr<QueuedTask> reply) {
+  return PostTaskAndReply(std::move(task), std::move(reply), Current());
+}
+
+// static
+bool TaskQueue::ThreadMain(void* context) {
+  std::unordered_map<UINT_PTR, std::unique_ptr<QueuedTask>> delayed_tasks;
+
+  BOOL ret;
+  MSG msg;
+
+  while ((ret = GetMessage(&msg, nullptr, 0, 0)) != 0 && ret != -1) {
+    if (!msg.hwnd) {
+      switch (msg.message) {
+        case WM_RUN_TASK: {
+          QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
+          if (task->Run())
+            delete task;
+          break;
+        }
+        case WM_QUEUE_DELAYED_TASK: {
+          QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
+          uint32_t milliseconds = msg.wParam & 0xFFFFFFFF;
+#if defined(_WIN64)
+          // Subtract the time it took to queue the timer.
+          const DWORD now = GetTickCount();
+          DWORD post_time = now - (msg.wParam >> 32);
+          milliseconds =
+              post_time > milliseconds ? 0 : milliseconds - post_time;
+#endif
+          UINT_PTR timer_id = SetTimer(nullptr, 0, milliseconds, nullptr);
+          delayed_tasks.insert(std::make_pair(timer_id, task));
+          break;
+        }
+        case WM_TIMER: {
+          KillTimer(nullptr, msg.wParam);
+          auto found = delayed_tasks.find(msg.wParam);
+          RTC_DCHECK(found != delayed_tasks.end());
+          if (!found->second->Run())
+            found->second.release();
+          delayed_tasks.erase(found);
+          break;
+        }
+        default:
+          RTC_NOTREACHED();
+          break;
+      }
+    } else {
+      TranslateMessage(&msg);
+      DispatchMessage(&msg);
+    }
+  }
+
+  return false;
+}
+}  // namespace rtc