Add support for priorities to TaskQueue.
BUG=webrtc:7216
Review-Url: https://codereview.webrtc.org/2708353003
Cr-Original-Commit-Position: refs/heads/master@{#16834}
Cr-Mirrored-From: https://chromium.googlesource.com/external/webrtc
Cr-Mirrored-Commit: c9bb7918f6c3dec9b3d583bbf2fc53e74ea875b6
diff --git a/base/task_queue.h b/base/task_queue.h
index 92a1c94..7d0d414 100644
--- a/base/task_queue.h
+++ b/base/task_queue.h
@@ -161,8 +161,16 @@
// so assumptions about lifetimes of pending tasks should not be made.
class LOCKABLE TaskQueue {
public:
- explicit TaskQueue(const char* queue_name);
- // TODO(tommi): Implement move semantics?
+ // TaskQueue priority levels. On some platforms these will map to thread
+ // priorities, on others such as Mac and iOS, GCD queue priorities.
+ enum class Priority {
+ NORMAL = 0,
+ HIGH,
+ LOW,
+ };
+
+ explicit TaskQueue(const char* queue_name,
+ Priority priority = Priority::NORMAL);
~TaskQueue();
static TaskQueue* Current();
@@ -275,8 +283,11 @@
class WorkerThread : public PlatformThread {
public:
- WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name)
- : PlatformThread(func, obj, thread_name) {}
+ WorkerThread(ThreadRunFunction func,
+ void* obj,
+ const char* thread_name,
+ ThreadPriority priority)
+ : PlatformThread(func, obj, thread_name, priority) {}
bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) {
return PlatformThread::QueueAPC(apc_function, data);
diff --git a/base/task_queue_gcd.cc b/base/task_queue_gcd.cc
index d5a4a6c..296da16 100644
--- a/base/task_queue_gcd.cc
+++ b/base/task_queue_gcd.cc
@@ -21,6 +21,22 @@
#include "webrtc/base/task_queue_posix.h"
namespace rtc {
+namespace {
+
+using Priority = TaskQueue::Priority;
+
+int TaskQueuePriorityToGCD(Priority priority) {
+ switch (priority) {
+ case Priority::NORMAL:
+ return DISPATCH_QUEUE_PRIORITY_DEFAULT;
+ case Priority::HIGH:
+ return DISPATCH_QUEUE_PRIORITY_HIGH;
+ case Priority::LOW:
+ return DISPATCH_QUEUE_PRIORITY_LOW;
+ }
+}
+}
+
using internal::GetQueuePtrTls;
using internal::AutoSetCurrentQueuePtr;
@@ -94,7 +110,7 @@
dispatch_queue_t reply_queue_;
};
-TaskQueue::TaskQueue(const char* queue_name)
+TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
: queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)),
context_(new QueueContext(this)) {
RTC_DCHECK(queue_name);
@@ -104,6 +120,9 @@
// to the queue is released. This may run after the TaskQueue object has
// been deleted.
dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext);
+
+ dispatch_set_target_queue(
+ queue_, dispatch_get_global_queue(TaskQueuePriorityToGCD(priority), 0));
}
TaskQueue::~TaskQueue() {
diff --git a/base/task_queue_libevent.cc b/base/task_queue_libevent.cc
index c6ce5b3..1376ea3 100644
--- a/base/task_queue_libevent.cc
+++ b/base/task_queue_libevent.cc
@@ -30,6 +30,8 @@
static const char kRunTask = 2;
static const char kRunReplyTask = 3;
+using Priority = TaskQueue::Priority;
+
// This ignores the SIGPIPE signal on the calling thread.
// This signal can be fired when trying to write() to a pipe that's being
// closed or while closing a pipe that's being written to.
@@ -84,6 +86,21 @@
RTC_CHECK_EQ(0, event_base_set(base, ev));
#endif
}
+
+ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
+ switch (priority) {
+ case Priority::HIGH:
+ return kRealtimePriority;
+ case Priority::LOW:
+ return kLowPriority;
+ case Priority::NORMAL:
+ return kNormalPriority;
+ default:
+ RTC_NOTREACHED();
+ break;
+ }
+ return kNormalPriority;
+}
} // namespace
struct TaskQueue::QueueContext {
@@ -201,10 +218,13 @@
const uint32_t posted_;
};
-TaskQueue::TaskQueue(const char* queue_name)
+TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
: event_base_(event_base_new()),
wakeup_event_(new event()),
- thread_(&TaskQueue::ThreadMain, this, queue_name) {
+ thread_(&TaskQueue::ThreadMain,
+ this,
+ queue_name,
+ TaskQueuePriorityToThreadPriority(priority)) {
RTC_DCHECK(queue_name);
int fds[2];
RTC_CHECK(pipe(fds) == 0);
diff --git a/base/task_queue_win.cc b/base/task_queue_win.cc
index c8ef721..5850b29 100644
--- a/base/task_queue_win.cc
+++ b/base/task_queue_win.cc
@@ -24,6 +24,8 @@
#define WM_RUN_TASK WM_USER + 1
#define WM_QUEUE_DELAYED_TASK WM_USER + 2
+using Priority = TaskQueue::Priority;
+
DWORD g_queue_ptr_tls = 0;
BOOL CALLBACK InitializeTls(PINIT_ONCE init_once, void* param, void** context) {
@@ -49,6 +51,21 @@
::TlsSetValue(GetQueuePtrTls(), data->thread_context);
data->started->Set();
}
+
+ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
+ switch (priority) {
+ case Priority::HIGH:
+ return kRealtimePriority;
+ case Priority::LOW:
+ return kLowPriority;
+ case Priority::NORMAL:
+ return kNormalPriority;
+ default:
+ RTC_NOTREACHED();
+ break;
+ }
+ return kNormalPriority;
+}
} // namespace
class TaskQueue::MultimediaTimer {
@@ -145,8 +162,11 @@
RTC_DISALLOW_COPY_AND_ASSIGN(MultimediaTimer);
};
-TaskQueue::TaskQueue(const char* queue_name)
- : thread_(&TaskQueue::ThreadMain, this, queue_name) {
+TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/)
+ : thread_(&TaskQueue::ThreadMain,
+ this,
+ queue_name,
+ TaskQueuePriorityToThreadPriority(priority)) {
RTC_DCHECK(queue_name);
thread_.Start();
Event event(false, false);