blob: 2b9d5a2ffc48941fee82afe5bb92682c683e3927 [file] [log] [blame]
Robin Raymond22027b92018-11-23 14:07:501/*
2 * Copyright 2018 The WebRTC Project Authors. All rights reserved.
3 *
4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree.
9 */
10
11#include "rtc_base/task_queue.h"
12
13#include <string.h>
14#include <algorithm>
15#include <atomic>
16#include <condition_variable>
17#include <map>
18#include <queue>
19#include <utility>
20
21#include "rtc_base/checks.h"
Steve Anton10542f22019-01-11 17:11:0022#include "rtc_base/critical_section.h"
Robin Raymond22027b92018-11-23 14:07:5023#include "rtc_base/event.h"
24#include "rtc_base/logging.h"
25#include "rtc_base/platform_thread.h"
Steve Anton10542f22019-01-11 17:11:0026#include "rtc_base/ref_count.h"
27#include "rtc_base/ref_counted_object.h"
Robin Raymond22027b92018-11-23 14:07:5028#include "rtc_base/thread_annotations.h"
Steve Anton10542f22019-01-11 17:11:0029#include "rtc_base/time_utils.h"
Robin Raymond22027b92018-11-23 14:07:5030
31namespace rtc {
32namespace {
33
34using Priority = TaskQueue::Priority;
35
36ThreadPriority TaskQueuePriorityToThreadPriority(Priority priority) {
37 switch (priority) {
38 case Priority::HIGH:
39 return kRealtimePriority;
40 case Priority::LOW:
41 return kLowPriority;
42 case Priority::NORMAL:
43 return kNormalPriority;
44 default:
45 RTC_NOTREACHED();
46 return kNormalPriority;
47 }
48 return kNormalPriority;
49}
50
51} // namespace
52
53class TaskQueue::Impl : public RefCountInterface {
54 public:
55 Impl(const char* queue_name, TaskQueue* queue, Priority priority);
56 ~Impl() override;
57
58 static TaskQueue::Impl* Current();
59 static TaskQueue* CurrentQueue();
60
61 // Used for DCHECKing the current queue.
62 bool IsCurrent() const;
63
64 template <class Closure,
65 typename std::enable_if<!std::is_convertible<
66 Closure,
67 std::unique_ptr<QueuedTask>>::value>::type* = nullptr>
68 void PostTask(Closure&& closure) {
69 PostTask(NewClosure(std::forward<Closure>(closure)));
70 }
71
72 void PostTask(std::unique_ptr<QueuedTask> task);
73 void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
74 std::unique_ptr<QueuedTask> reply,
75 TaskQueue::Impl* reply_queue);
76
77 void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
78
79 class WorkerThread : public PlatformThread {
80 public:
81 WorkerThread(ThreadRunFunction func,
82 void* obj,
83 const char* thread_name,
84 ThreadPriority priority)
85 : PlatformThread(func, obj, thread_name, priority) {}
86 };
87
88 using OrderId = uint64_t;
89
90 struct DelayedEntryTimeout {
91 int64_t next_fire_at_ms_{};
92 OrderId order_{};
93
94 bool operator<(const DelayedEntryTimeout& o) const {
95 return std::tie(next_fire_at_ms_, order_) <
96 std::tie(o.next_fire_at_ms_, o.order_);
97 }
98 };
99
100 struct NextTask {
101 bool final_task_{false};
102 std::unique_ptr<QueuedTask> run_task_;
103 int64_t sleep_time_ms_{};
104 };
105
106 protected:
107 NextTask GetNextTask();
108
109 private:
110 // The ThreadQueue::Current() method requires that the current thread
111 // returns the task queue if the current thread is the active task
112 // queue and this variable holds the value needed in thread_local to
113 // on the initialized worker thread holding the queue.
114 static thread_local TaskQueue::Impl* thread_context_;
115
116 static void ThreadMain(void* context);
117
118 void ProcessTasks();
119
120 void NotifyWake();
121
122 // The back pointer from the owner task queue object
123 // from this implementation detail.
124 TaskQueue* const queue_;
125
126 // Indicates if the thread has started.
127 Event started_;
128
129 // Indicates if the thread has stopped.
130 Event stopped_;
131
132 // Signaled whenever a new task is pending.
133 Event flag_notify_;
134
135 // Contains the active worker thread assigned to processing
136 // tasks (including delayed tasks).
137 WorkerThread thread_;
138
139 rtc::CriticalSection pending_lock_;
140
141 // Indicates if the worker thread needs to shutdown now.
142 bool thread_should_quit_ RTC_GUARDED_BY(pending_lock_){false};
143
144 // Holds the next order to use for the next task to be
145 // put into one of the pending queues.
146 OrderId thread_posting_order_ RTC_GUARDED_BY(pending_lock_){};
147
148 // The list of all pending tasks that need to be processed in the
149 // FIFO queue ordering on the worker thread.
150 std::queue<std::pair<OrderId, std::unique_ptr<QueuedTask>>> pending_queue_
151 RTC_GUARDED_BY(pending_lock_);
152
153 // The list of all pending tasks that need to be processed at a future
154 // time based upon a delay. On the off change the delayed task should
155 // happen at exactly the same time interval as another task then the
156 // task is processed based on FIFO ordering. std::priority_queue was
157 // considered but rejected due to its inability to extract the
158 // std::unique_ptr out of the queue without the presence of a hack.
159 std::map<DelayedEntryTimeout, std::unique_ptr<QueuedTask>> delayed_queue_
160 RTC_GUARDED_BY(pending_lock_);
161};
162
163// static
164thread_local TaskQueue::Impl* TaskQueue::Impl::thread_context_ = nullptr;
165
166TaskQueue::Impl::Impl(const char* queue_name,
167 TaskQueue* queue,
168 Priority priority)
169 : queue_(queue),
170 started_(/*manual_reset=*/false, /*initially_signaled=*/false),
171 stopped_(/*manual_reset=*/false, /*initially_signaled=*/false),
172 flag_notify_(/*manual_reset=*/false, /*initially_signaled=*/false),
173 thread_(&TaskQueue::Impl::ThreadMain,
174 this,
175 queue_name,
176 TaskQueuePriorityToThreadPriority(priority)) {
177 RTC_DCHECK(queue_name);
178 thread_.Start();
179 started_.Wait(Event::kForever);
180}
181
182TaskQueue::Impl::~Impl() {
183 RTC_DCHECK(!IsCurrent());
184
185 {
186 CritScope lock(&pending_lock_);
187 thread_should_quit_ = true;
188 }
189
190 NotifyWake();
191
192 stopped_.Wait(Event::kForever);
193 thread_.Stop();
194}
195
196// static
197TaskQueue::Impl* TaskQueue::Impl::Current() {
198 return thread_context_;
199}
200
201// static
202TaskQueue* TaskQueue::Impl::CurrentQueue() {
203 TaskQueue::Impl* current = Current();
204 return current ? current->queue_ : nullptr;
205}
206
207bool TaskQueue::Impl::IsCurrent() const {
208 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
209}
210
211void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
212 {
213 CritScope lock(&pending_lock_);
214 OrderId order = thread_posting_order_++;
215
216 pending_queue_.push(std::pair<OrderId, std::unique_ptr<QueuedTask>>(
217 order, std::move(task)));
218 }
219
220 NotifyWake();
221}
222
223void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
224 uint32_t milliseconds) {
225 auto fire_at = rtc::TimeMillis() + milliseconds;
226
227 DelayedEntryTimeout delay;
228 delay.next_fire_at_ms_ = fire_at;
229
230 {
231 CritScope lock(&pending_lock_);
232 delay.order_ = ++thread_posting_order_;
233 delayed_queue_[delay] = std::move(task);
234 }
235
236 NotifyWake();
237}
238
239void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
240 std::unique_ptr<QueuedTask> reply,
241 TaskQueue::Impl* reply_queue) {
242 QueuedTask* task_ptr = task.release();
243 QueuedTask* reply_task_ptr = reply.release();
244 PostTask([task_ptr, reply_task_ptr, reply_queue]() {
245 if (task_ptr->Run())
246 delete task_ptr;
247
248 reply_queue->PostTask(std::unique_ptr<QueuedTask>(reply_task_ptr));
249 });
250}
251
252TaskQueue::Impl::NextTask TaskQueue::Impl::GetNextTask() {
253 NextTask result{};
254
255 auto tick = rtc::TimeMillis();
256
257 CritScope lock(&pending_lock_);
258
259 if (thread_should_quit_) {
260 result.final_task_ = true;
261 return result;
262 }
263
264 if (delayed_queue_.size() > 0) {
265 auto delayed_entry = delayed_queue_.begin();
266 const auto& delay_info = delayed_entry->first;
267 auto& delay_run = delayed_entry->second;
268 if (tick >= delay_info.next_fire_at_ms_) {
269 if (pending_queue_.size() > 0) {
270 auto& entry = pending_queue_.front();
271 auto& entry_order = entry.first;
272 auto& entry_run = entry.second;
273 if (entry_order < delay_info.order_) {
274 result.run_task_ = std::move(entry_run);
275 pending_queue_.pop();
276 return result;
277 }
278 }
279
280 result.run_task_ = std::move(delay_run);
281 delayed_queue_.erase(delayed_entry);
282 return result;
283 }
284
285 result.sleep_time_ms_ = delay_info.next_fire_at_ms_ - tick;
286 }
287
288 if (pending_queue_.size() > 0) {
289 auto& entry = pending_queue_.front();
290 result.run_task_ = std::move(entry.second);
291 pending_queue_.pop();
292 }
293
294 return result;
295}
296
297// static
298void TaskQueue::Impl::ThreadMain(void* context) {
299 TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
300 me->ProcessTasks();
301}
302
303void TaskQueue::Impl::ProcessTasks() {
304 thread_context_ = this;
305 started_.Set();
306
307 while (true) {
308 auto task = GetNextTask();
309
310 if (task.final_task_)
311 break;
312
313 if (task.run_task_) {
314 // process entry immediately then try again
315 QueuedTask* release_ptr = task.run_task_.release();
316 if (release_ptr->Run())
317 delete release_ptr;
318
319 // attempt to sleep again
320 continue;
321 }
322
323 if (0 == task.sleep_time_ms_)
324 flag_notify_.Wait(Event::kForever);
325 else
326 flag_notify_.Wait(task.sleep_time_ms_);
327 }
328
329 stopped_.Set();
330}
331
332void TaskQueue::Impl::NotifyWake() {
333 // The queue holds pending tasks to complete. Either tasks are to be
334 // executed immediately or tasks are to be run at some future delayed time.
335 // For immediate tasks the task queue's thread is busy running the task and
336 // the thread will not be waiting on the flag_notify_ event. If no immediate
337 // tasks are available but a delayed task is pending then the thread will be
338 // waiting on flag_notify_ with a delayed time-out of the nearest timed task
339 // to run. If no immediate or pending tasks are available, the thread will
340 // wait on flag_notify_ until signaled that a task has been added (or the
341 // thread to be told to shutdown).
342
343 // In all cases, when a new immediate task, delayed task, or request to
344 // shutdown the thread is added the flag_notify_ is signaled after. If the
345 // thread was waiting then the thread will wake up immediately and re-assess
346 // what task needs to be run next (i.e. run a task now, wait for the nearest
347 // timed delayed task, or shutdown the thread). If the thread was not waiting
348 // then the thread will remained signaled to wake up the next time any
349 // attempt to wait on the flag_notify_ event occurs.
350
351 // Any immediate or delayed pending task (or request to shutdown the thread)
352 // must always be added to the queue prior to signaling flag_notify_ to wake
353 // up the possibly sleeping thread. This prevents a race condition where the
354 // thread is notified to wake up but the task queue's thread finds nothing to
355 // do so it waits once again to be signaled where such a signal may never
356 // happen.
357 flag_notify_.Set();
358}
359
360// Boilerplate for the PIMPL pattern.
361TaskQueue::TaskQueue(const char* queue_name, Priority priority)
362 : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
363}
364
365TaskQueue::~TaskQueue() {}
366
367// static
368TaskQueue* TaskQueue::Current() {
369 return TaskQueue::Impl::CurrentQueue();
370}
371
372// Used for DCHECKing the current queue.
373bool TaskQueue::IsCurrent() const {
374 return impl_->IsCurrent();
375}
376
377void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
378 return TaskQueue::impl_->PostTask(std::move(task));
379}
380
381void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
382 std::unique_ptr<QueuedTask> reply,
383 TaskQueue* reply_queue) {
384 return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
385 reply_queue->impl_.get());
386}
387
388void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
389 std::unique_ptr<QueuedTask> reply) {
390 return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
391 impl_.get());
392}
393
394void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
395 uint32_t milliseconds) {
396 return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
397}
398
399} // namespace rtc