This cl refactor TaskQueues to use a PIMPL implementation on linux/Android.
In later steps the Win/Mac implementation will also be refactored.
The rtc_task_queue target is split up in three separate targets:
rtc_task_queue_api:
Contains the header file task_queue.h but no implementation.
Only external TaskQueue implementations should directly depend on this target.
rtc_task_queue_impl:
Contains the default implementation of task_queue.h.
Only external application targets should directly depend on this target.
rtc_task_queue:
WebRTC targets should depend on this target. It unconditionally depend on rtc_task_queue_api and depending on the new build flag,|rtc_link_task_queue_impl|, depend on rtc_task_queue_impl.
BUG=webrtc:8160
Review-Url: https://codereview.webrtc.org/3003643002
Cr-Original-Commit-Position: refs/heads/master@{#19516}
Cr-Mirrored-From: https://chromium.googlesource.com/external/webrtc
Cr-Mirrored-Commit: 650fdae91c690fb9904f72873daeb81415ff9ba1
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index a9e82e5..d26d5e7 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -295,42 +295,71 @@
defines = [ "WEBRTC_BUILD_LIBEVENT" ]
}
-rtc_static_library("rtc_task_queue") {
+rtc_source_set("rtc_task_queue") {
public_deps = [
":rtc_base_approved",
+ ":rtc_task_queue_api",
]
+ if (rtc_link_task_queue_impl) {
+ deps = [
+ ":rtc_task_queue_impl",
+ ]
+ }
+}
+
+# WebRTC targets must not directly depend on rtc_task_queue_api or
+# rtc_task_queue_impl. Instead, depend on rtc_task_queue.
+# The build flag |rtc_link_task_queue_impl| decides if WebRTC targets will link
+# to the default implemenation in rtc_task_queue_impl or if an externally
+# provided implementation should be used. An external implementation should
+# depend on rtc_task_queue_api.
+rtc_source_set("rtc_task_queue_api") {
if (build_with_chromium) {
sources = [
- "../../webrtc_overrides/webrtc/rtc_base/task_queue.cc",
"../../webrtc_overrides/webrtc/rtc_base/task_queue.h",
]
} else {
sources = [
"task_queue.h",
- "task_queue_posix.h",
]
- if (rtc_build_libevent) {
- deps = [
- "//base/third_party/libevent",
- ]
- }
+ }
+ deps = [
+ ":rtc_base_approved",
+ ]
+}
+rtc_source_set("rtc_task_queue_impl") {
+ deps = [
+ ":rtc_base_approved",
+ ":rtc_task_queue_api",
+ ]
+ if (build_with_chromium) {
+ sources = [
+ "../../webrtc_overrides/webrtc/rtc_base/task_queue.cc",
+ ]
+ } else {
+ if (rtc_build_libevent) {
+ deps += [ "//base/third_party/libevent" ]
+ }
if (rtc_enable_libevent) {
- sources += [
+ sources = [
"task_queue_libevent.cc",
"task_queue_posix.cc",
+ "task_queue_posix.h",
]
all_dependent_configs = [ ":enable_libevent_config" ]
} else {
if (is_mac || is_ios) {
- sources += [
+ sources = [
"task_queue_gcd.cc",
"task_queue_posix.cc",
]
}
if (is_win) {
- sources += [ "task_queue_win.cc" ]
+ sources = [
+ "task_queue_win.cc",
+ ]
}
}
}
diff --git a/rtc_base/task_queue.h b/rtc_base/task_queue.h
index 218fce9..e7eac2f 100644
--- a/rtc_base/task_queue.h
+++ b/rtc_base/task_queue.h
@@ -15,23 +15,16 @@
#include <memory>
#include <queue>
-#if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT)
+#if defined(WEBRTC_MAC)
#include <dispatch/dispatch.h>
#endif
#include "webrtc/rtc_base/constructormagic.h"
#include "webrtc/rtc_base/criticalsection.h"
-
-#if defined(WEBRTC_WIN) || defined(WEBRTC_BUILD_LIBEVENT)
-#include "webrtc/rtc_base/platform_thread.h"
-#endif
-
-#if defined(WEBRTC_BUILD_LIBEVENT)
-#include "webrtc/rtc_base/refcountedobject.h"
#include "webrtc/rtc_base/scoped_ref_ptr.h"
-struct event_base;
-struct event;
+#if defined(WEBRTC_WIN)
+#include "webrtc/rtc_base/platform_thread.h"
#endif
namespace rtc {
@@ -242,32 +235,7 @@
}
private:
-#if defined(WEBRTC_BUILD_LIBEVENT)
- static void ThreadMain(void* context);
- static void OnWakeup(int socket, short flags, void* context); // NOLINT
- static void RunTask(int fd, short flags, void* context); // NOLINT
- static void RunTimer(int fd, short flags, void* context); // NOLINT
-
- class ReplyTaskOwner;
- class PostAndReplyTask;
- class SetTimerTask;
-
- typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
-
- void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
-
- struct QueueContext;
-
- int wakeup_pipe_in_ = -1;
- int wakeup_pipe_out_ = -1;
- event_base* event_base_;
- std::unique_ptr<event> wakeup_event_;
- PlatformThread thread_;
- rtc::CriticalSection pending_lock_;
- std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
- std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
- GUARDED_BY(pending_lock_);
-#elif defined(WEBRTC_MAC)
+#if defined(WEBRTC_MAC)
struct QueueContext;
struct TaskContext;
struct PostTaskAndReplyContext;
@@ -295,7 +263,8 @@
std::queue<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
HANDLE in_queue_;
#else
-#error not supported.
+ class Impl;
+ const scoped_refptr<Impl> impl_;
#endif
RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue);
diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc
index db267dc..99b88df 100644
--- a/rtc_base/task_queue_libevent.cc
+++ b/rtc_base/task_queue_libevent.cc
@@ -18,7 +18,11 @@
#include "base/third_party/libevent/event.h"
#include "webrtc/rtc_base/checks.h"
#include "webrtc/rtc_base/logging.h"
+#include "webrtc/rtc_base/platform_thread.h"
+#include "webrtc/rtc_base/refcount.h"
+#include "webrtc/rtc_base/refcountedobject.h"
#include "webrtc/rtc_base/safe_conversions.h"
+#include "webrtc/rtc_base/task_queue.h"
#include "webrtc/rtc_base/task_queue_posix.h"
#include "webrtc/rtc_base/timeutils.h"
@@ -104,9 +108,57 @@
}
} // namespace
-struct TaskQueue::QueueContext {
- explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {}
- TaskQueue* queue;
+class TaskQueue::Impl : public RefCountInterface {
+ public:
+ explicit Impl(const char* queue_name,
+ TaskQueue* queue,
+ Priority priority = Priority::NORMAL);
+ ~Impl() override;
+
+ static TaskQueue::Impl* Current();
+ static TaskQueue* CurrentQueue();
+
+ // Used for DCHECKing the current queue.
+ static bool IsCurrent(const char* queue_name);
+ bool IsCurrent() const;
+
+ void PostTask(std::unique_ptr<QueuedTask> task);
+ void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueue::Impl* reply_queue);
+
+ void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
+
+ private:
+ static void ThreadMain(void* context);
+ static void OnWakeup(int socket, short flags, void* context); // NOLINT
+ static void RunTask(int fd, short flags, void* context); // NOLINT
+ static void RunTimer(int fd, short flags, void* context); // NOLINT
+
+ class ReplyTaskOwner;
+ class PostAndReplyTask;
+ class SetTimerTask;
+
+ typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
+
+ void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
+
+ struct QueueContext;
+ TaskQueue* const queue_;
+ int wakeup_pipe_in_ = -1;
+ int wakeup_pipe_out_ = -1;
+ event_base* event_base_;
+ std::unique_ptr<event> wakeup_event_;
+ PlatformThread thread_;
+ rtc::CriticalSection pending_lock_;
+ std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
+ std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
+ GUARDED_BY(pending_lock_);
+};
+
+struct TaskQueue::Impl::QueueContext {
+ explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {}
+ TaskQueue::Impl* queue;
bool is_active;
// Holds a list of events pending timers for cleanup when the loop exits.
std::list<TimerEvent*> pending_timers_;
@@ -135,7 +187,7 @@
// * if set_should_run_task() was called, the reply task will be run
// * Release the reference to ReplyTaskOwner
// * ReplyTaskOwner and associated |reply_| are deleted.
-class TaskQueue::ReplyTaskOwner {
+class TaskQueue::Impl::ReplyTaskOwner {
public:
ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
: reply_(std::move(reply)) {}
@@ -159,11 +211,11 @@
bool run_task_ = false;
};
-class TaskQueue::PostAndReplyTask : public QueuedTask {
+class TaskQueue::Impl::PostAndReplyTask : public QueuedTask {
public:
PostAndReplyTask(std::unique_ptr<QueuedTask> task,
std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue,
+ TaskQueue::Impl* reply_queue,
int reply_pipe)
: task_(std::move(task)),
reply_pipe_(reply_pipe),
@@ -196,7 +248,7 @@
scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
};
-class TaskQueue::SetTimerTask : public QueuedTask {
+class TaskQueue::Impl::SetTimerTask : public QueuedTask {
public:
SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
: task_(std::move(task)),
@@ -208,7 +260,7 @@
// Compensate for the time that has passed since construction
// and until we got here.
uint32_t post_time = Time32() - posted_;
- TaskQueue::Current()->PostDelayedTask(
+ TaskQueue::Impl::Current()->PostDelayedTask(
std::move(task_),
post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
return true;
@@ -219,10 +271,13 @@
const uint32_t posted_;
};
-TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
- : event_base_(event_base_new()),
+TaskQueue::Impl::Impl(const char* queue_name,
+ TaskQueue* queue,
+ Priority priority /*= NORMAL*/)
+ : queue_(queue),
+ event_base_(event_base_new()),
wakeup_event_(new event()),
- thread_(&TaskQueue::ThreadMain,
+ thread_(&TaskQueue::Impl::ThreadMain,
this,
queue_name,
TaskQueuePriorityToThreadPriority(priority)) {
@@ -240,7 +295,7 @@
thread_.Start();
}
-TaskQueue::~TaskQueue() {
+TaskQueue::Impl::~Impl() {
RTC_DCHECK(!IsCurrent());
struct timespec ts;
char message = kQuit;
@@ -267,29 +322,38 @@
}
// static
-TaskQueue* TaskQueue::Current() {
+TaskQueue::Impl* TaskQueue::Impl::Current() {
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
return ctx ? ctx->queue : nullptr;
}
// static
-bool TaskQueue::IsCurrent(const char* queue_name) {
- TaskQueue* current = Current();
+TaskQueue* TaskQueue::Impl::CurrentQueue() {
+ TaskQueue::Impl* current = Current();
+ if (current) {
+ return current->queue_;
+ }
+ return nullptr;
+}
+
+// static
+bool TaskQueue::Impl::IsCurrent(const char* queue_name) {
+ TaskQueue::Impl* current = Current();
return current && current->thread_.name().compare(queue_name) == 0;
}
-bool TaskQueue::IsCurrent() const {
+bool TaskQueue::Impl::IsCurrent() const {
return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
}
-void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
RTC_DCHECK(task.get());
// libevent isn't thread safe. This means that we can't use methods such
// as event_base_once to post tasks to the worker thread from a different
// thread. However, we can use it when posting from the worker thread itself.
if (IsCurrent()) {
- if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask,
+ if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask,
task.get(), nullptr) == 0) {
task.release();
}
@@ -310,11 +374,12 @@
}
}
-void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
+void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) {
if (IsCurrent()) {
TimerEvent* timer = new TimerEvent(std::move(task));
- EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer);
+ EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer,
+ timer);
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
ctx->pending_timers_.push_back(timer);
@@ -327,23 +392,18 @@
}
}
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply,
- TaskQueue* reply_queue) {
+void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueue::Impl* reply_queue) {
std::unique_ptr<QueuedTask> wrapper_task(
new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
reply_queue->wakeup_pipe_in_));
PostTask(std::move(wrapper_task));
}
-void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
- std::unique_ptr<QueuedTask> reply) {
- return PostTaskAndReply(std::move(task), std::move(reply), Current());
-}
-
// static
-void TaskQueue::ThreadMain(void* context) {
- TaskQueue* me = static_cast<TaskQueue*>(context);
+void TaskQueue::Impl::ThreadMain(void* context) {
+ TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
QueueContext queue_context(me);
pthread_setspecific(GetQueuePtrTls(), &queue_context);
@@ -358,7 +418,9 @@
}
// static
-void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT
+void TaskQueue::Impl::OnWakeup(int socket,
+ short flags,
+ void* context) { // NOLINT
QueueContext* ctx =
static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
@@ -405,14 +467,14 @@
}
// static
-void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT
+void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT
auto* task = static_cast<QueuedTask*>(context);
if (task->Run())
delete task;
}
// static
-void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT
+void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT
TimerEvent* timer = static_cast<TimerEvent*>(context);
if (!timer->task->Run())
timer->task.release();
@@ -422,10 +484,54 @@
delete timer;
}
-void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) {
+void TaskQueue::Impl::PrepareReplyTask(
+ scoped_refptr<ReplyTaskOwnerRef> reply_task) {
RTC_DCHECK(reply_task);
CritScope lock(&pending_lock_);
pending_replies_.push_back(std::move(reply_task));
}
+TaskQueue::TaskQueue(const char* queue_name, Priority priority)
+ : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
+}
+
+TaskQueue::~TaskQueue() {}
+
+// static
+TaskQueue* TaskQueue::Current() {
+ return TaskQueue::Impl::CurrentQueue();
+}
+
+// Used for DCHECKing the current queue.
+// static
+bool TaskQueue::IsCurrent(const char* queue_name) {
+ return TaskQueue::Impl::IsCurrent(queue_name);
+}
+
+bool TaskQueue::IsCurrent() const {
+ return impl_->IsCurrent();
+}
+
+void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
+ return TaskQueue::impl_->PostTask(std::move(task));
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply,
+ TaskQueue* reply_queue) {
+ return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
+ reply_queue->impl_.get());
+}
+
+void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
+ std::unique_ptr<QueuedTask> reply) {
+ return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
+ impl_.get());
+}
+
+void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
+ uint32_t milliseconds) {
+ return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
+}
+
} // namespace rtc
diff --git a/webrtc.gni b/webrtc.gni
index 071c9cc..72e031e 100644
--- a/webrtc.gni
+++ b/webrtc.gni
@@ -105,7 +105,14 @@
# See http://clang.llvm.org/docs/SanitizerCoverage.html .
rtc_sanitize_coverage = ""
+ # Links a default implementation of task queues to targets
+ # that depend on the target rtc_task_queue. Set to false to
+ # use an external implementation.
+ rtc_link_task_queue_impl = true
+
# Enable libevent task queues on platforms that support it.
+ # rtc_link_task_queue_impl must be set to true for this to
+ # have an effect.
if (is_win || is_mac || is_ios || is_nacl) {
rtc_enable_libevent = false
rtc_build_libevent = false
@@ -314,6 +321,7 @@
"//build/config:exe_and_shlib_deps",
]
deps += invoker.deps
+
public_configs = [ rtc_common_inherited_config ]
if (defined(invoker.public_configs)) {
public_configs += invoker.public_configs