Revert of New task queueing primitive for async tasks: TaskQueue. (patchset #5 id:80001 of https://codereview.webrtc.org/1919733002/ )
Reason for revert:
Reverting this temporarily while I figure out the issues with the Chrome on android GN debug build.
Original issue's description:
> New task queueing primitive for async tasks: TaskQueue.
> TaskQueue is a new way to asynchronously execute tasks sequentially
> in a thread safe manner with minimal locking. The implementation
> uses OS supported APIs to do this that are compatible with async IO
> notifications from things like sockets and files.
>
> This class is a part of rtc_base_approved, so can be used by both
> the webrtc and libjingle parts of the WebRTC library. Moving forward,
> we can replace rtc::Thread and webrtc::ProcessThread with this implementation.
>
> NOTE: It should not be assumed that all tasks that execute on a TaskQueue,
> run on the same thread. E.g. on Mac and iOS, we use GCD dispatch queues
> which means that tasks might execute on different threads depending on
> what's the most efficient thing to do.
TBR=perkj@webrtc.org
# Skipping CQ checks because original CL landed less than 1 days ago.
NOPRESUBMIT=true
NOTREECHECKS=true
NOTRY=true
Review-Url: https://codereview.webrtc.org/1935483002
Cr-Commit-Position: refs/heads/master@{#12562}
diff --git a/webrtc/base/BUILD.gn b/webrtc/base/BUILD.gn
index 4eaf617..11e886c 100644
--- a/webrtc/base/BUILD.gn
+++ b/webrtc/base/BUILD.gn
@@ -154,12 +154,6 @@
"swap_queue.h",
"systeminfo.cc",
"systeminfo.h",
- "task_queue.h",
- "task_queue_gcd.cc",
- "task_queue_libevent.cc",
- "task_queue_posix.cc",
- "task_queue_posix.h",
- "task_queue_win.cc",
"template_util.h",
"thread_annotations.h",
"thread_checker.h",
@@ -184,17 +178,6 @@
"logging.h",
"logging_mac.mm",
]
- if (!is_win && !is_mac && !is_ios) {
- deps += [ "//base/third_party/libevent" ]
- }
- }
-
- if (is_mac || is_ios || is_win) {
- sources -= [ "task_queue_libevent.cc" ]
- }
-
- if (is_linux || is_android || is_win) {
- sources -= [ "task_queue_gcd.cc" ]
}
}
diff --git a/webrtc/base/base.gyp b/webrtc/base/base.gyp
index 3ea7b15..99ca457276 100644
--- a/webrtc/base/base.gyp
+++ b/webrtc/base/base.gyp
@@ -85,12 +85,6 @@
'swap_queue.h',
'systeminfo.cc',
'systeminfo.h',
- 'task_queue.h',
- 'task_queue_libevent.cc',
- 'task_queue_gcd.cc',
- 'task_queue_posix.cc',
- 'task_queue_posix.h',
- 'task_queue_win.cc',
'template_util.h',
'thread_annotations.h',
'thread_checker.h',
@@ -118,19 +112,6 @@
'logging.h',
'logging_mac.mm',
],
- 'conditions': [
- ['OS!="win" and OS!="mac" and OS!="ios"', {
- 'dependencies': [
- '<(DEPTH)/base/third_party/libevent/libevent.gyp:libevent',
- ],
- }],
- ],
- }],
- ['OS=="mac" or OS=="ios" or OS=="win"', {
- 'sources!': [ 'task_queue_libevent.cc' ],
- }],
- ['OS=="linux" or OS=="android" or OS=="win"', {
- 'sources!': [ 'task_queue_gcd.cc' ],
}],
['OS=="mac" and build_with_chromium==0', {
'all_dependent_settings': {
diff --git a/webrtc/base/base_tests.gyp b/webrtc/base/base_tests.gyp
index 019b5762..8248cef 100644
--- a/webrtc/base/base_tests.gyp
+++ b/webrtc/base/base_tests.gyp
@@ -100,7 +100,6 @@
'swap_queue_unittest.cc',
# TODO(ronghuawu): Reenable this test.
# 'systeminfo_unittest.cc',
- 'task_queue_unittest.cc',
'task_unittest.cc',
'testclient_unittest.cc',
'thread_checker_unittest.cc',
diff --git a/webrtc/base/task_queue.h b/webrtc/base/task_queue.h
deleted file mode 100644
index 520e5af..0000000
--- a/webrtc/base/task_queue.h
+++ /dev/null
@@ -1,281 +0,0 @@
-/*
- * Copyright 2016 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef WEBRTC_BASE_TASK_QUEUE_H_
-#define WEBRTC_BASE_TASK_QUEUE_H_
-
-#if defined(WEBRTC_POSIX) && !defined(WEBRTC_MAC)
-#define LIBEVENT_TASK_QUEUE
-#endif
-
-#include <list>
-#include <memory>
-
-#if defined(WEBRTC_MAC)
-#include <dispatch/dispatch.h>
-#endif
-
-#include "webrtc/base/constructormagic.h"
-#include "webrtc/base/criticalsection.h"
-
-#if !defined(WEBRTC_MAC)
-#include "webrtc/base/platform_thread.h"
-#endif
-
-#if defined(LIBEVENT_TASK_QUEUE)
-struct event_base;
-struct event;
-#endif
-
-namespace rtc {
-
-// Base interface for asynchronously executed tasks.
-// The interface basically consists of a single function, Run(), that executes
-// on the target queue. For more details see the Run() method and TaskQueue.
-class QueuedTask {
- public:
- QueuedTask() {}
- virtual ~QueuedTask() {}
-
- // Main routine that will run when the task is executed on the desired queue.
- // The task should return |true| to indicate that it should be deleted or
- // |false| to indicate that the queue should consider ownership of the task
- // having been transferred. Returning |false| can be useful if a task has
- // re-posted itself to a different queue or is otherwise being re-used.
- virtual bool Run() = 0;
-
- private:
- RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask);
-};
-
-// Simple implementation of QueuedTask for use with rtc::Bind and lambdas.
-template <class Closure>
-class ClosureTask : public QueuedTask {
- public:
- explicit ClosureTask(const Closure& closure) : closure_(closure) {}
-
- private:
- bool Run() override {
- closure_();
- return true;
- }
-
- Closure closure_;
-};
-
-// Extends ClosureTask to also allow specifying cleanup code.
-// This is useful when using lambdas if guaranteeing cleanup, even if a task
-// was dropped (queue is too full), is required.
-template <class Closure, class Cleanup>
-class ClosureTaskWithCleanup : public ClosureTask<Closure> {
- public:
- ClosureTaskWithCleanup(const Closure& closure, Cleanup cleanup)
- : ClosureTask<Closure>(closure), cleanup_(cleanup) {}
- ~ClosureTaskWithCleanup() { cleanup_(); }
-
- private:
- Cleanup cleanup_;
-};
-
-// Convenience function to construct closures that can be passed directly
-// to methods that support std::unique_ptr<QueuedTask> but not template
-// based parameters.
-template <class Closure>
-static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure) {
- return std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure));
-}
-
-template <class Closure, class Cleanup>
-static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure,
- const Cleanup& cleanup) {
- return std::unique_ptr<QueuedTask>(
- new ClosureTaskWithCleanup<Closure, Cleanup>(closure, cleanup));
-}
-
-// Implements a task queue that asynchronously executes tasks in a way that
-// guarantees that they're executed in FIFO order and that tasks never overlap.
-// Tasks may always execute on the same worker thread and they may not.
-// To DCHECK that tasks are executing on a known task queue, use IsCurrent().
-//
-// Here are some usage examples:
-//
-// 1) Asynchronously running a lambda:
-//
-// class MyClass {
-// ...
-// TaskQueue queue_("MyQueue");
-// };
-//
-// void MyClass::StartWork() {
-// queue_.PostTask([]() { Work(); });
-// ...
-//
-// 2) Doing work asynchronously on a worker queue and providing a notification
-// callback on the current queue, when the work has been done:
-//
-// void MyClass::StartWorkAndLetMeKnowWhenDone(
-// std::unique_ptr<QueuedTask> callback) {
-// DCHECK(TaskQueue::Current()) << "Need to be running on a queue";
-// queue_.PostTaskAndReply([]() { Work(); }, std::move(callback));
-// }
-// ...
-// my_class->StartWorkAndLetMeKnowWhenDone(
-// NewClosure([]() { LOG(INFO) << "The work is done!";}));
-//
-// 3) Posting a custom task on a timer. The task posts itself again after
-// every running:
-//
-// class TimerTask : public QueuedTask {
-// public:
-// TimerTask() {}
-// private:
-// bool Run() override {
-// ++count_;
-// TaskQueue::Current()->PostDelayedTask(
-// std::unique_ptr<QueuedTask>(this), 1000);
-// // Ownership has been transferred to the next occurance,
-// // so return false to prevent from being deleted now.
-// return false;
-// }
-// int count_ = 0;
-// };
-// ...
-// queue_.PostDelayedTask(
-// std::unique_ptr<QueuedTask>(new TimerTask()), 1000);
-//
-// For more examples, see task_queue_unittests.cc.
-//
-// A note on destruction:
-//
-// When a TaskQueue is deleted, pending tasks will not be executed but they will
-// be deleted. The deletion of tasks may happen asynchronously after the
-// TaskQueue itself has been deleted or it may happen synchronously while the
-// TaskQueue instance is being deleted. This may vary from one OS to the next
-// so assumptions about lifetimes of pending tasks should not be made.
-class TaskQueue {
- public:
- explicit TaskQueue(const char* queue_name);
- // TODO(tommi): Implement move semantics?
- ~TaskQueue();
-
- static TaskQueue* Current();
-
- // Used for DCHECKing the current queue.
- static bool IsCurrent(const char* queue_name);
- bool IsCurrent() const;
-
- // TODO(tommi): For better debuggability, implement FROM_HERE.
-
- // Ownership of the task is passed to PostTask.
- void PostTask(std::unique_ptr<QueuedTask> task);
- void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue);
- void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply);
-
- void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
-
- template <class Closure>
- void PostTask(const Closure& closure) {
- PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)));
- }
-
- template <class Closure>
- void PostDelayedTask(const Closure& closure, uint32_t milliseconds) {
- PostDelayedTask(
- std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)),
- milliseconds);
- }
-
- template <class Closure1, class Closure2>
- void PostTaskAndReply(const Closure1& task,
- const Closure2& reply,
- TaskQueue* reply_queue) {
- PostTaskAndReply(
- std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
- std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)),
- reply_queue);
- }
-
- template <class Closure>
- void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- const Closure& reply) {
- PostTaskAndReply(std::move(task), std::unique_ptr<QueuedTask>(
- new ClosureTask<Closure>(reply)));
- }
-
- template <class Closure>
- void PostTaskAndReply(const Closure& task,
- std::unique_ptr<QueuedTask> reply) {
- PostTaskAndReply(
- std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(task)),
- std::move(reply));
- }
-
- template <class Closure1, class Closure2>
- void PostTaskAndReply(const Closure1& task, const Closure2& reply) {
- PostTaskAndReply(
- std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)),
- std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)));
- }
-
- private:
-#if defined(LIBEVENT_TASK_QUEUE)
- static bool ThreadMain(void* context);
- static void OnWakeup(int socket, short flags, void* context); // NOLINT
- static void RunTask(int fd, short flags, void* context); // NOLINT
- static void RunTimer(int fd, short flags, void* context); // NOLINT
-
- class PostAndReplyTask;
- class SetTimerTask;
-
- void PrepareReplyTask(PostAndReplyTask* reply_task);
- void ReplyTaskDone(PostAndReplyTask* reply_task);
-
- struct QueueContext;
-
- int wakeup_pipe_in_ = -1;
- int wakeup_pipe_out_ = -1;
- event_base* event_base_;
- std::unique_ptr<event> wakeup_event_;
- PlatformThread thread_;
- rtc::CriticalSection pending_lock_;
- std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
- std::list<PostAndReplyTask*> pending_replies_ GUARDED_BY(pending_lock_);
-#elif defined(WEBRTC_MAC)
- struct QueueContext;
- struct TaskContext;
- struct PostTaskAndReplyContext;
- dispatch_queue_t queue_;
- QueueContext* const context_;
-#elif defined(WEBRTC_WIN)
- static bool ThreadMain(void* context);
-
- class WorkerThread : public PlatformThread {
- public:
- WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name)
- : PlatformThread(func, obj, thread_name) {}
-
- bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
- return PlatformThread::QueueAPC(apc_function, data);
- }
- };
- WorkerThread thread_;
-#else
-#error not supported.
-#endif
-
- RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);
-};
-
-} // namespace rtc
-
-#endif // WEBRTC_BASE_TASK_QUEUE_H_
diff --git a/webrtc/base/task_queue_gcd.cc b/webrtc/base/task_queue_gcd.cc
deleted file mode 100644
index 2c7d649..0000000
--- a/webrtc/base/task_queue_gcd.cc
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Copyright 2016 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-// This file contains the implementation of TaskQueue for Mac and iOS.
-// The implementation uses Grand Central Dispatch queues (GCD) to
-// do the actual task queuing.
-
-#include "webrtc/base/task_queue.h"
-
-#include <string.h>
-
-#include "webrtc/base/checks.h"
-#include "webrtc/base/logging.h"
-#include "webrtc/base/task_queue_posix.h"
-
-namespace rtc {
-using internal::GetQueuePtrTls;
-using internal::AutoSetCurrentQueuePtr;
-
-struct TaskQueue::QueueContext {
- explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
-
- static void SetNotActive(void* context) {
- QueueContext* qc = static_cast<QueueContext*>(context);
- qc->is_active = false;
- }
-
- static void DeleteContext(void* context) {
- QueueContext* qc = static_cast<QueueContext*>(context);
- delete qc;
- }
-
- TaskQueue* const queue;
- bool is_active;
-};
-
-struct TaskQueue::TaskContext {
- TaskContext(QueueContext* queue_ctx, std::unique_ptr<QueuedTask> task)
- : queue_ctx(queue_ctx), task(std::move(task)) {}
- virtual ~TaskContext() {}
-
- static void RunTask(void* context) {
- std::unique_ptr<TaskContext> tc(static_cast<TaskContext*>(context));
- if (tc->queue_ctx->is_active) {
- AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue);
- if (!tc->task->Run())
- tc->task.release();
- }
- }
-
- QueueContext* const queue_ctx;
- std::unique_ptr<QueuedTask> task;
-};
-
-// Special case context for holding two tasks, a |first_task| + the task
-// that's owned by the parent struct, TaskContext, that then becomes the
-// second (i.e. 'reply') task.
-struct TaskQueue::PostTaskAndReplyContext : public TaskQueue::TaskContext {
- explicit PostTaskAndReplyContext(QueueContext* first_queue_ctx,
- std::unique_ptr<QueuedTask> first_task,
- QueueContext* second_queue_ctx,
- std::unique_ptr<QueuedTask> second_task)
- : TaskContext(second_queue_ctx, std::move(second_task)),
- first_queue_ctx(first_queue_ctx),
- first_task(std::move(first_task)) {
- // Retain the reply queue for as long as this object lives.
- // If we don't, we may have memory leaks and/or failures.
- dispatch_retain(first_queue_ctx->queue->queue_);
- }
- ~PostTaskAndReplyContext() override {
- dispatch_release(first_queue_ctx->queue->queue_);
- }
-
- static void RunTask(void* context) {
- auto* rc = static_cast<PostTaskAndReplyContext*>(context);
- if (rc->first_queue_ctx->is_active) {
- AutoSetCurrentQueuePtr set_current(rc->first_queue_ctx->queue);
- if (!rc->first_task->Run())
- rc->first_task.release();
- }
- // Post the reply task. This hands the work over to the parent struct.
- // This task will eventually delete |this|.
- dispatch_async_f(rc->queue_ctx->queue->queue_, rc, &TaskContext::RunTask);
- }
-
- QueueContext* const first_queue_ctx;
- std::unique_ptr<QueuedTask> first_task;
-};
-
-TaskQueue::TaskQueue(const char* queue_name)
- : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)),
- context_(new QueueContext(this)) {
- RTC_DCHECK(queue_name);
- RTC_CHECK(queue_);
- dispatch_set_context(queue_, context_);
- // Assign a finalizer that will delete the context when the last reference
- // to the queue is released. This may run after the TaskQueue object has
- // been deleted.
- dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext);
-}
-
-TaskQueue::~TaskQueue() {
- RTC_DCHECK(!IsCurrent());
- // Implementation/behavioral note:
- // Dispatch queues are reference counted via calls to dispatch_retain and
- // dispatch_release. Pending blocks submitted to a queue also hold a
- // reference to the queue until they have finished. Once all references to a
- // queue have been released, the queue will be deallocated by the system.
- // This is why we check the context before running tasks.
-
- // Use dispatch_sync to set the context to null to guarantee that there's not
- // a race between checking the context and using it from a task.
- dispatch_sync_f(queue_, context_, &QueueContext::SetNotActive);
- dispatch_release(queue_);
-}
-
-// static
-TaskQueue* TaskQueue::Current() {
- return static_cast<TaskQueue*>(pthread_getspecific(GetQueuePtrTls()));
-}
-
-// static
-bool TaskQueue::IsCurrent(const char* queue_name) {
- TaskQueue* current = Current();
- return current &&
- strcmp(queue_name, dispatch_queue_get_label(current->queue_)) == 0;
-}
-
-bool TaskQueue::IsCurrent() const {
- RTC_DCHECK(queue_);
- return this == Current();
-}
-
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
- auto* context = new TaskContext(context_, std::move(task));
- dispatch_async_f(queue_, context, &TaskContext::RunTask);
-}
-
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
- auto* context = new TaskContext(context_, std::move(task));
- dispatch_after_f(
- dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_MSEC), queue_,
- context, &TaskContext::RunTask);
-}
-
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue) {
- auto* context = new PostTaskAndReplyContext(
- context_, std::move(task), reply_queue->context_, std::move(reply));
- dispatch_async_f(queue_, context, &PostTaskAndReplyContext::RunTask);
-}
-
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply) {
- return PostTaskAndReply(std::move(task), std::move(reply), Current());
-}
-
-} // namespace rtc
diff --git a/webrtc/base/task_queue_libevent.cc b/webrtc/base/task_queue_libevent.cc
deleted file mode 100644
index a59b450..0000000
--- a/webrtc/base/task_queue_libevent.cc
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Copyright 2016 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "webrtc/base/task_queue.h"
-
-#include <fcntl.h>
-#include <string.h>
-#include <unistd.h>
-
-#include "base/third_party/libevent/event.h"
-#include "webrtc/base/checks.h"
-#include "webrtc/base/logging.h"
-#include "webrtc/base/task_queue_posix.h"
-#include "webrtc/base/timeutils.h"
-
-namespace rtc {
-using internal::GetQueuePtrTls;
-using internal::AutoSetCurrentQueuePtr;
-
-namespace {
-static const char kQuit = 1;
-static const char kRunTask = 2;
-
-struct TimerEvent {
- explicit TimerEvent(std::unique_ptr<QueuedTask> task)
- : task(std::move(task)) {}
- ~TimerEvent() { event_del(&ev); }
- event ev;
- std::unique_ptr<QueuedTask> task;
-};
-
-bool SetNonBlocking(int fd) {
- const int flags = fcntl(fd, F_GETFL);
- RTC_CHECK(flags != -1);
- return (flags & O_NONBLOCK) || fcntl(fd, F_SETFL, flags | O_NONBLOCK) != -1;
-}
-} // namespace
-
-struct TaskQueue::QueueContext {
- explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
- TaskQueue* queue;
- bool is_active;
- // Holds a list of events pending timers for cleanup when the loop exits.
- std::list<TimerEvent*> pending_timers_;
-};
-
-class TaskQueue::PostAndReplyTask : public QueuedTask {
- public:
- PostAndReplyTask(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue)
- : task_(std::move(task)),
- reply_(std::move(reply)),
- reply_queue_(reply_queue) {
- reply_queue->PrepareReplyTask(this);
- }
-
- ~PostAndReplyTask() override {
- CritScope lock(&lock_);
- if (reply_queue_)
- reply_queue_->ReplyTaskDone(this);
- }
-
- void OnReplyQueueGone() {
- CritScope lock(&lock_);
- reply_queue_ = nullptr;
- }
-
- private:
- bool Run() override {
- if (!task_->Run())
- task_.release();
-
- CritScope lock(&lock_);
- if (reply_queue_)
- reply_queue_->PostTask(std::move(reply_));
- return true;
- }
-
- CriticalSection lock_;
- std::unique_ptr<QueuedTask> task_;
- std::unique_ptr<QueuedTask> reply_;
- TaskQueue* reply_queue_ GUARDED_BY(lock_);
-};
-
-class TaskQueue::SetTimerTask : public QueuedTask {
- public:
- SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
- : task_(std::move(task)),
- milliseconds_(milliseconds),
- posted_(Time32()) {}
-
- private:
- bool Run() override {
- // Compensate for the time that has passed since construction
- // and until we got here.
- uint32_t post_time = Time32() - posted_;
- TaskQueue::Current()->PostDelayedTask(
- std::move(task_),
- post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
- return true;
- }
-
- std::unique_ptr<QueuedTask> task_;
- const uint32_t milliseconds_;
- const uint32_t posted_;
-};
-
-TaskQueue::TaskQueue(const char* queue_name)
- : event_base_(event_base_new()),
- wakeup_event_(new event()),
- thread_(&TaskQueue::ThreadMain, this, queue_name) {
- RTC_DCHECK(queue_name);
- int fds[2];
- RTC_CHECK(pipe(fds) == 0);
- SetNonBlocking(fds[0]);
- SetNonBlocking(fds[1]);
- wakeup_pipe_out_ = fds[0];
- wakeup_pipe_in_ = fds[1];
- event_set(wakeup_event_.get(), wakeup_pipe_out_, EV_READ | EV_PERSIST,
- OnWakeup, this);
- event_base_set(event_base_, wakeup_event_.get());
- event_add(wakeup_event_.get(), 0);
- thread_.Start();
-}
-
-TaskQueue::~TaskQueue() {
- RTC_DCHECK(!IsCurrent());
- struct timespec ts;
- char message = kQuit;
- while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
- // The queue is full, so we have no choice but to wait and retry.
- RTC_CHECK_EQ(EAGAIN, errno);
- ts.tv_sec = 0;
- ts.tv_nsec = 1000000;
- nanosleep(&ts, nullptr);
- }
-
- thread_.Stop();
-
- event_del(wakeup_event_.get());
- close(wakeup_pipe_in_);
- close(wakeup_pipe_out_);
- wakeup_pipe_in_ = -1;
- wakeup_pipe_out_ = -1;
-
- {
- // Synchronize against any pending reply tasks that might be running on
- // other queues.
- CritScope lock(&pending_lock_);
- for (auto* reply : pending_replies_)
- reply->OnReplyQueueGone();
- pending_replies_.clear();
- }
-
- event_base_free(event_base_);
-}
-
-// static
-TaskQueue* TaskQueue::Current() {
- QueueContext* ctx =
- static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
- return ctx ? ctx->queue : nullptr;
-}
-
-// static
-bool TaskQueue::IsCurrent(const char* queue_name) {
- TaskQueue* current = Current();
- return current && current->thread_.name().compare(queue_name) == 0;
-}
-
-bool TaskQueue::IsCurrent() const {
- return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
-}
-
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
- RTC_DCHECK(task.get());
- // libevent isn't thread safe. This means that we can't use methods such
- // as event_base_once to post tasks to the worker thread from a different
- // thread. However, we can use it when posting from the worker thread itself.
- if (IsCurrent()) {
- if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
- task.get(), nullptr) == 0) {
- task.release();
- }
- } else {
- QueuedTask* task_id = task.get(); // Only used for comparison.
- {
- CritScope lock(&pending_lock_);
- pending_.push_back(std::move(task));
- }
- char message = kRunTask;
- if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
- LOG(WARNING) << "Failed to queue task.";
- CritScope lock(&pending_lock_);
- pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
- return t.get() == task_id;
- });
- }
- }
-}
-
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
- if (IsCurrent()) {
- TimerEvent* timer = new TimerEvent(std::move(task));
- evtimer_set(&timer->ev, &TaskQueue::RunTimer, timer);
- event_base_set(event_base_, &timer->ev);
- QueueContext* ctx =
- static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
- ctx->pending_timers_.push_back(timer);
- timeval tv = {milliseconds / 1000, (milliseconds % 1000) * 1000};
- event_add(&timer->ev, &tv);
- } else {
- PostTask(std::unique_ptr<QueuedTask>(
- new SetTimerTask(std::move(task), milliseconds)));
- }
-}
-
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue) {
- std::unique_ptr<QueuedTask> wrapper_task(
- new PostAndReplyTask(std::move(task), std::move(reply), reply_queue));
- PostTask(std::move(wrapper_task));
-}
-
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply) {
- return PostTaskAndReply(std::move(task), std::move(reply), Current());
-}
-
-// static
-bool TaskQueue::ThreadMain(void* context) {
- TaskQueue* me = static_cast<TaskQueue*>(context);
-
- QueueContext queue_context(me);
- pthread_setspecific(GetQueuePtrTls(), &queue_context);
-
- while (queue_context.is_active)
- event_base_loop(me->event_base_, 0);
-
- pthread_setspecific(GetQueuePtrTls(), nullptr);
-
- for (TimerEvent* timer : queue_context.pending_timers_)
- delete timer;
-
- return false;
-}
-
-// static
-void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
- QueueContext* ctx =
- static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
- RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
- char buf;
- RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
- switch (buf) {
- case kQuit:
- ctx->is_active = false;
- event_base_loopbreak(ctx->queue->event_base_);
- break;
- case kRunTask: {
- std::unique_ptr<QueuedTask> task;
- {
- CritScope lock(&ctx->queue->pending_lock_);
- RTC_DCHECK(!ctx->queue->pending_.empty());
- task = std::move(ctx->queue->pending_.front());
- ctx->queue->pending_.pop_front();
- RTC_DCHECK(task.get());
- }
- if (!task->Run())
- task.release();
- break;
- }
- default:
- RTC_NOTREACHED();
- break;
- }
-}
-
-// static
-void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
- auto* task = static_cast<QueuedTask*>(context);
- if (task->Run())
- delete task;
-}
-
-// static
-void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
- TimerEvent* timer = static_cast<TimerEvent*>(context);
- if (!timer->task->Run())
- timer->task.release();
- QueueContext* ctx =
- static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
- ctx->pending_timers_.remove(timer);
- delete timer;
-}
-
-void TaskQueue::PrepareReplyTask(PostAndReplyTask* reply_task) {
- RTC_DCHECK(reply_task);
- CritScope lock(&pending_lock_);
- pending_replies_.push_back(reply_task);
-}
-
-void TaskQueue::ReplyTaskDone(PostAndReplyTask* reply_task) {
- CritScope lock(&pending_lock_);
- pending_replies_.remove(reply_task);
-}
-
-} // namespace rtc
diff --git a/webrtc/base/task_queue_posix.cc b/webrtc/base/task_queue_posix.cc
deleted file mode 100644
index 3b00ac8..0000000
--- a/webrtc/base/task_queue_posix.cc
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Copyright 2016 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "webrtc/base/task_queue_posix.h"
-
-#include "webrtc/base/checks.h"
-#include "webrtc/base/task_queue.h"
-
-namespace rtc {
-namespace internal {
-pthread_key_t g_queue_ptr_tls = 0;
-
-void InitializeTls() {
- RTC_CHECK(pthread_key_create(&g_queue_ptr_tls, nullptr) == 0);
-}
-
-pthread_key_t GetQueuePtrTls() {
- static pthread_once_t init_once = PTHREAD_ONCE_INIT;
- RTC_CHECK(pthread_once(&init_once, &InitializeTls) == 0);
- return g_queue_ptr_tls;
-}
-
-AutoSetCurrentQueuePtr::AutoSetCurrentQueuePtr(TaskQueue* q)
- : prev_(TaskQueue::Current()) {
- pthread_setspecific(GetQueuePtrTls(), q);
-}
-
-AutoSetCurrentQueuePtr::~AutoSetCurrentQueuePtr() {
- pthread_setspecific(GetQueuePtrTls(), prev_);
-}
-
-} // namespace internal
-} // namespace rtc
diff --git a/webrtc/base/task_queue_posix.h b/webrtc/base/task_queue_posix.h
deleted file mode 100644
index b677b78..0000000
--- a/webrtc/base/task_queue_posix.h
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright 2016 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef WEBRTC_BASE_TASK_QUEUE_POSIX_H_
-#define WEBRTC_BASE_TASK_QUEUE_POSIX_H_
-
-#include <pthread.h>
-
-namespace rtc {
-
-class TaskQueue;
-
-namespace internal {
-
-class AutoSetCurrentQueuePtr {
- public:
- explicit AutoSetCurrentQueuePtr(TaskQueue* q);
- ~AutoSetCurrentQueuePtr();
-
- private:
- TaskQueue* const prev_;
-};
-
-pthread_key_t GetQueuePtrTls();
-
-} // namespace internal
-} // namespace rtc
-
-#endif // WEBRTC_BASE_TASK_QUEUE_POSIX_H_
diff --git a/webrtc/base/task_queue_unittest.cc b/webrtc/base/task_queue_unittest.cc
deleted file mode 100644
index db4e6c2..0000000
--- a/webrtc/base/task_queue_unittest.cc
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Copyright 2016 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include <memory>
-#include <vector>
-
-#include "webrtc/base/bind.h"
-#include "webrtc/base/event.h"
-#include "webrtc/base/gunit.h"
-#include "webrtc/base/task_queue.h"
-#include "webrtc/base/timeutils.h"
-
-namespace rtc {
-
-namespace {
-void CheckCurrent(const char* expected_queue, Event* signal, TaskQueue* queue) {
- EXPECT_TRUE(TaskQueue::IsCurrent(expected_queue));
- EXPECT_TRUE(queue->IsCurrent());
- if (signal)
- signal->Set();
-}
-
-} // namespace
-
-TEST(TaskQueueTest, Construct) {
- static const char kQueueName[] = "Construct";
- TaskQueue queue(kQueueName);
- EXPECT_FALSE(queue.IsCurrent());
-}
-
-TEST(TaskQueueTest, PostAndCheckCurrent) {
- static const char kQueueName[] = "PostAndCheckCurrent";
- TaskQueue queue(kQueueName);
-
- // We're not running a task, so there shouldn't be a current queue.
- EXPECT_FALSE(queue.IsCurrent());
- EXPECT_FALSE(TaskQueue::Current());
-
- Event event(false, false);
- queue.PostTask(Bind(&CheckCurrent, kQueueName, &event, &queue));
- EXPECT_TRUE(event.Wait(1000));
-}
-
-TEST(TaskQueueTest, PostCustomTask) {
- static const char kQueueName[] = "PostCustomImplementation";
- TaskQueue queue(kQueueName);
-
- Event event(false, false);
-
- class CustomTask : public QueuedTask {
- public:
- explicit CustomTask(Event* event) : event_(event) {}
-
- private:
- bool Run() override {
- event_->Set();
- return false; // Never allows the task to be deleted by the queue.
- }
-
- Event* const event_;
- } my_task(&event);
-
- // Please don't do this in production code! :)
- queue.PostTask(std::unique_ptr<QueuedTask>(&my_task));
- EXPECT_TRUE(event.Wait(1000));
-}
-
-TEST(TaskQueueTest, PostLambda) {
- static const char kQueueName[] = "PostLambda";
- TaskQueue queue(kQueueName);
-
- Event event(false, false);
- queue.PostTask([&event]() { event.Set(); });
- EXPECT_TRUE(event.Wait(1000));
-}
-
-TEST(TaskQueueTest, PostFromQueue) {
- static const char kQueueName[] = "PostFromQueue";
- TaskQueue queue(kQueueName);
-
- Event event(false, false);
- queue.PostTask(
- [&event, &queue]() { queue.PostTask([&event]() { event.Set(); }); });
- EXPECT_TRUE(event.Wait(1000));
-}
-
-TEST(TaskQueueTest, PostDelayed) {
- static const char kQueueName[] = "PostDelayed";
- TaskQueue queue(kQueueName);
-
- Event event(false, false);
- uint32_t start = Time();
- queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100);
- EXPECT_TRUE(event.Wait(1000));
- uint32_t end = Time();
- EXPECT_GE(end - start, 100u);
- EXPECT_NEAR(end - start, 200u, 100u); // Accept 100-300.
-}
-
-TEST(TaskQueueTest, PostMultipleDelayed) {
- static const char kQueueName[] = "PostMultipleDelayed";
- TaskQueue queue(kQueueName);
-
- std::vector<std::unique_ptr<Event>> events;
- for (int i = 0; i < 10; ++i) {
- events.push_back(std::unique_ptr<Event>(new Event(false, false)));
- queue.PostDelayedTask(
- Bind(&CheckCurrent, kQueueName, events.back().get(), &queue), 10);
- }
-
- for (const auto& e : events)
- EXPECT_TRUE(e->Wait(100));
-}
-
-TEST(TaskQueueTest, PostDelayedAfterDestruct) {
- static const char kQueueName[] = "PostDelayedAfterDestruct";
- Event event(false, false);
- {
- TaskQueue queue(kQueueName);
- queue.PostDelayedTask(Bind(&CheckCurrent, kQueueName, &event, &queue), 100);
- }
- EXPECT_FALSE(event.Wait(200)); // Task should not run.
-}
-
-TEST(TaskQueueTest, PostAndReply) {
- static const char kPostQueue[] = "PostQueue";
- static const char kReplyQueue[] = "ReplyQueue";
- TaskQueue post_queue(kPostQueue);
- TaskQueue reply_queue(kReplyQueue);
-
- Event event(false, false);
- post_queue.PostTaskAndReply(
- Bind(&CheckCurrent, kPostQueue, nullptr, &post_queue),
- Bind(&CheckCurrent, kReplyQueue, &event, &reply_queue), &reply_queue);
- EXPECT_TRUE(event.Wait(1000));
-}
-
-TEST(TaskQueueTest, PostAndReuse) {
- static const char kPostQueue[] = "PostQueue";
- static const char kReplyQueue[] = "ReplyQueue";
- TaskQueue post_queue(kPostQueue);
- TaskQueue reply_queue(kReplyQueue);
-
- int call_count = 0;
-
- class ReusedTask : public QueuedTask {
- public:
- ReusedTask(int* counter, TaskQueue* reply_queue, Event* event)
- : counter_(counter), reply_queue_(reply_queue), event_(event) {
- EXPECT_EQ(0, *counter_);
- }
-
- private:
- bool Run() override {
- if (++(*counter_) == 1) {
- std::unique_ptr<QueuedTask> myself(this);
- reply_queue_->PostTask(std::move(myself));
- // At this point, the object is owned by reply_queue_ and it's
- // theoratically possible that the object has been deleted (e.g. if
- // posting wasn't possible). So, don't touch any member variables here.
-
- // Indicate to the current queue that ownership has been transferred.
- return false;
- } else {
- EXPECT_EQ(2, *counter_);
- EXPECT_TRUE(reply_queue_->IsCurrent());
- event_->Set();
- return true; // Indicate that the object should be deleted.
- }
- }
-
- int* const counter_;
- TaskQueue* const reply_queue_;
- Event* const event_;
- };
-
- Event event(false, false);
- std::unique_ptr<QueuedTask> task(
- new ReusedTask(&call_count, &reply_queue, &event));
-
- post_queue.PostTask(std::move(task));
- EXPECT_TRUE(event.Wait(1000));
-}
-
-TEST(TaskQueueTest, PostAndReplyLambda) {
- static const char kPostQueue[] = "PostQueue";
- static const char kReplyQueue[] = "ReplyQueue";
- TaskQueue post_queue(kPostQueue);
- TaskQueue reply_queue(kReplyQueue);
-
- Event event(false, false);
- bool my_flag = false;
- post_queue.PostTaskAndReply([&my_flag]() { my_flag = true; },
- [&event]() { event.Set(); }, &reply_queue);
- EXPECT_TRUE(event.Wait(1000));
- EXPECT_TRUE(my_flag);
-}
-
-void TestPostTaskAndReply(TaskQueue* work_queue,
- const char* work_queue_name,
- Event* event) {
- ASSERT_FALSE(work_queue->IsCurrent());
- work_queue->PostTaskAndReply(
- Bind(&CheckCurrent, work_queue_name, nullptr, work_queue),
- NewClosure([event]() { event->Set(); }));
-}
-
-// Does a PostTaskAndReply from within a task to post and reply to the current
-// queue. All in all there will be 3 tasks posted and run.
-TEST(TaskQueueTest, PostAndReply2) {
- static const char kQueueName[] = "PostAndReply2";
- static const char kWorkQueueName[] = "PostAndReply2_Worker";
- TaskQueue queue(kQueueName);
- TaskQueue work_queue(kWorkQueueName);
-
- Event event(false, false);
- queue.PostTask(
- Bind(&TestPostTaskAndReply, &work_queue, kWorkQueueName, &event));
- EXPECT_TRUE(event.Wait(1000));
-}
-
-// Tests posting more messages than a queue can queue up.
-// In situations like that, tasks will get dropped.
-TEST(TaskQueueTest, PostALot) {
- // To destruct the event after the queue has gone out of scope.
- Event event(false, false);
-
- int tasks_executed = 0;
- int tasks_cleaned_up = 0;
- static const int kTaskCount = 0xffff;
-
- {
- static const char kQueueName[] = "PostALot";
- TaskQueue queue(kQueueName);
-
- // On linux, the limit of pending bytes in the pipe buffer is 0xffff.
- // So here we post a total of 0xffff+1 messages, which triggers a failure
- // case inside of the libevent queue implementation.
-
- queue.PostTask([&event]() { event.Wait(Event::kForever); });
- for (int i = 0; i < kTaskCount; ++i)
- queue.PostTask(NewClosure([&tasks_executed]() { ++tasks_executed; },
- [&tasks_cleaned_up]() { ++tasks_cleaned_up; }));
- event.Set(); // Unblock the first task.
- }
-
- EXPECT_GE(tasks_cleaned_up, tasks_executed);
- EXPECT_EQ(kTaskCount, tasks_cleaned_up);
-
- LOG(INFO) << "tasks executed: " << tasks_executed
- << ", tasks cleaned up: " << tasks_cleaned_up;
-}
-
-} // namespace rtc
diff --git a/webrtc/base/task_queue_win.cc b/webrtc/base/task_queue_win.cc
deleted file mode 100644
index 5ae6d92..0000000
--- a/webrtc/base/task_queue_win.cc
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Copyright 2016 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "webrtc/base/task_queue.h"
-
-#include <string.h>
-#include <unordered_map>
-
-#include "webrtc/base/checks.h"
-#include "webrtc/base/logging.h"
-
-namespace rtc {
-namespace {
-#define WM_RUN_TASK WM_USER + 1
-#define WM_QUEUE_DELAYED_TASK WM_USER + 2
-
-DWORD g_queue_ptr_tls = 0;
-
-BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
- g_queue_ptr_tls = TlsAlloc();
- return TRUE;
-}
-
-DWORD GetQueuePtrTls() {
- static INIT_ONCE init_once = INIT_ONCE_STATIC_INIT;
- InitOnceExecuteOnce(&init_once, InitializeTls, nullptr, nullptr);
- return g_queue_ptr_tls;
-}
-
-struct ThreadStartupData {
- Event* started;
- void* thread_context;
-};
-
-void CALLBACK InitializeQueueThread(ULONG_PTR param) {
- MSG msg;
- PeekMessage(&msg, NULL, WM_USER, WM_USER, PM_NOREMOVE);
- ThreadStartupData* data = reinterpret_cast<ThreadStartupData*>(param);
- TlsSetValue(GetQueuePtrTls(), data->thread_context);
- data->started->Set();
-}
-} // namespace
-
-TaskQueue::TaskQueue(const char* queue_name)
- : thread_(&TaskQueue::ThreadMain, this, queue_name) {
- RTC_DCHECK(queue_name);
- thread_.Start();
- Event event(false, false);
- ThreadStartupData startup = {&event, this};
- RTC_CHECK(thread_.QueueAPC(&InitializeQueueThread,
- reinterpret_cast<ULONG_PTR>(&startup)));
- event.Wait(Event::kForever);
-}
-
-TaskQueue::~TaskQueue() {
- RTC_DCHECK(!IsCurrent());
- while (!PostThreadMessage(thread_.GetThreadRef(), WM_QUIT, 0, 0)) {
- RTC_CHECK(ERROR_NOT_ENOUGH_QUOTA == ::GetLastError());
- Sleep(1);
- }
- thread_.Stop();
-}
-
-// static
-TaskQueue* TaskQueue::Current() {
- return static_cast<TaskQueue*>(TlsGetValue(GetQueuePtrTls()));
-}
-
-// static
-bool TaskQueue::IsCurrent(const char* queue_name) {
- TaskQueue* current = Current();
- return current && current->thread_.name().compare(queue_name) == 0;
-}
-
-bool TaskQueue::IsCurrent() const {
- return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
-}
-
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
- if (PostThreadMessage(thread_.GetThreadRef(), WM_RUN_TASK, 0,
- reinterpret_cast<LPARAM>(task.get()))) {
- task.release();
- }
-}
-
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
- WPARAM wparam;
-#if defined(_WIN64)
- // GetTickCount() returns a fairly coarse tick count (resolution or about 8ms)
- // so this compensation isn't that accurate, but since we have unused 32 bits
- // on Win64, we might as well use them.
- wparam = (static_cast<WPARAM>(::GetTickCount()) << 32) | milliseconds;
-#else
- wparam = milliseconds;
-#endif
- if (PostThreadMessage(thread_.GetThreadRef(), WM_QUEUE_DELAYED_TASK, wparam,
- reinterpret_cast<LPARAM>(task.get()))) {
- task.release();
- }
-}
-
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue) {
- QueuedTask* task_ptr = task.release();
- QueuedTask* reply_task_ptr = reply.release();
- DWORD reply_thread_id = reply_queue->thread_.GetThreadRef();
- PostTask([task_ptr, reply_task_ptr, reply_thread_id]() {
- if (task_ptr->Run())
- delete task_ptr;
- // If the thread's message queue is full, we can't queue the task and will
- // have to drop it (i.e. delete).
- if (!PostThreadMessage(reply_thread_id, WM_RUN_TASK, 0,
- reinterpret_cast<LPARAM>(reply_task_ptr))) {
- delete reply_task_ptr;
- }
- });
-}
-
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply) {
- return PostTaskAndReply(std::move(task), std::move(reply), Current());
-}
-
-// static
-bool TaskQueue::ThreadMain(void* context) {
- std::unordered_map<UINT_PTR, std::unique_ptr<QueuedTask>> delayed_tasks;
-
- BOOL ret;
- MSG msg;
-
- while ((ret = GetMessage(&msg, nullptr, 0, 0)) != 0 && ret != -1) {
- if (!msg.hwnd) {
- switch (msg.message) {
- case WM_RUN_TASK: {
- QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
- if (task->Run())
- delete task;
- break;
- }
- case WM_QUEUE_DELAYED_TASK: {
- QueuedTask* task = reinterpret_cast<QueuedTask*>(msg.lParam);
- uint32_t milliseconds = msg.wParam & 0xFFFFFFFF;
-#if defined(_WIN64)
- // Subtract the time it took to queue the timer.
- const DWORD now = GetTickCount();
- DWORD post_time = now - (msg.wParam >> 32);
- milliseconds =
- post_time > milliseconds ? 0 : milliseconds - post_time;
-#endif
- UINT_PTR timer_id = SetTimer(nullptr, 0, milliseconds, nullptr);
- delayed_tasks.insert(std::make_pair(timer_id, task));
- break;
- }
- case WM_TIMER: {
- KillTimer(nullptr, msg.wParam);
- auto found = delayed_tasks.find(msg.wParam);
- RTC_DCHECK(found != delayed_tasks.end());
- if (!found->second->Run())
- found->second.release();
- delayed_tasks.erase(found);
- break;
- }
- default:
- RTC_NOTREACHED();
- break;
- }
- } else {
- TranslateMessage(&msg);
- DispatchMessage(&msg);
- }
- }
-
- return false;
-}
-} // namespace rtc