| /* |
| * Copyright 2016 The WebRTC Project Authors. All rights reserved. |
| * |
| * Use of this source code is governed by a BSD-style license |
| * that can be found in the LICENSE file in the root of the source |
| * tree. An additional intellectual property rights grant can be found |
| * in the file PATENTS. All contributing project authors may |
| * be found in the AUTHORS file in the root of the source tree. |
| */ |
| |
| #ifndef WEBRTC_BASE_TASK_QUEUE_H_ |
| #define WEBRTC_BASE_TASK_QUEUE_H_ |
| |
| #include <list> |
| #include <memory> |
| #include <unordered_map> |
| |
| #if defined(WEBRTC_MAC) && !defined(WEBRTC_BUILD_LIBEVENT) |
| #include <dispatch/dispatch.h> |
| #endif |
| |
| #include "webrtc/base/constructormagic.h" |
| #include "webrtc/base/criticalsection.h" |
| |
| #if defined(WEBRTC_WIN) || defined(WEBRTC_BUILD_LIBEVENT) |
| #include "webrtc/base/platform_thread.h" |
| #endif |
| |
| #if defined(WEBRTC_BUILD_LIBEVENT) |
| struct event_base; |
| struct event; |
| #endif |
| |
| namespace rtc { |
| |
| // Base interface for asynchronously executed tasks. |
| // The interface basically consists of a single function, Run(), that executes |
| // on the target queue. For more details see the Run() method and TaskQueue. |
| class QueuedTask { |
| public: |
| QueuedTask() {} |
| virtual ~QueuedTask() {} |
| |
| // Main routine that will run when the task is executed on the desired queue. |
| // The task should return |true| to indicate that it should be deleted or |
| // |false| to indicate that the queue should consider ownership of the task |
| // having been transferred. Returning |false| can be useful if a task has |
| // re-posted itself to a different queue or is otherwise being re-used. |
| virtual bool Run() = 0; |
| |
| private: |
| RTC_DISALLOW_COPY_AND_ASSIGN(QueuedTask); |
| }; |
| |
| // Simple implementation of QueuedTask for use with rtc::Bind and lambdas. |
| template <class Closure> |
| class ClosureTask : public QueuedTask { |
| public: |
| explicit ClosureTask(const Closure& closure) : closure_(closure) {} |
| |
| private: |
| bool Run() override { |
| closure_(); |
| return true; |
| } |
| |
| Closure closure_; |
| }; |
| |
| // Extends ClosureTask to also allow specifying cleanup code. |
| // This is useful when using lambdas if guaranteeing cleanup, even if a task |
| // was dropped (queue is too full), is required. |
| template <class Closure, class Cleanup> |
| class ClosureTaskWithCleanup : public ClosureTask<Closure> { |
| public: |
| ClosureTaskWithCleanup(const Closure& closure, Cleanup cleanup) |
| : ClosureTask<Closure>(closure), cleanup_(cleanup) {} |
| ~ClosureTaskWithCleanup() { cleanup_(); } |
| |
| private: |
| Cleanup cleanup_; |
| }; |
| |
| // Convenience function to construct closures that can be passed directly |
| // to methods that support std::unique_ptr<QueuedTask> but not template |
| // based parameters. |
| template <class Closure> |
| static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure) { |
| return std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)); |
| } |
| |
| template <class Closure, class Cleanup> |
| static std::unique_ptr<QueuedTask> NewClosure(const Closure& closure, |
| const Cleanup& cleanup) { |
| return std::unique_ptr<QueuedTask>( |
| new ClosureTaskWithCleanup<Closure, Cleanup>(closure, cleanup)); |
| } |
| |
| // Implements a task queue that asynchronously executes tasks in a way that |
| // guarantees that they're executed in FIFO order and that tasks never overlap. |
| // Tasks may always execute on the same worker thread and they may not. |
| // To DCHECK that tasks are executing on a known task queue, use IsCurrent(). |
| // |
| // Here are some usage examples: |
| // |
| // 1) Asynchronously running a lambda: |
| // |
| // class MyClass { |
| // ... |
| // TaskQueue queue_("MyQueue"); |
| // }; |
| // |
| // void MyClass::StartWork() { |
| // queue_.PostTask([]() { Work(); }); |
| // ... |
| // |
| // 2) Doing work asynchronously on a worker queue and providing a notification |
| // callback on the current queue, when the work has been done: |
| // |
| // void MyClass::StartWorkAndLetMeKnowWhenDone( |
| // std::unique_ptr<QueuedTask> callback) { |
| // DCHECK(TaskQueue::Current()) << "Need to be running on a queue"; |
| // queue_.PostTaskAndReply([]() { Work(); }, std::move(callback)); |
| // } |
| // ... |
| // my_class->StartWorkAndLetMeKnowWhenDone( |
| // NewClosure([]() { LOG(INFO) << "The work is done!";})); |
| // |
| // 3) Posting a custom task on a timer. The task posts itself again after |
| // every running: |
| // |
| // class TimerTask : public QueuedTask { |
| // public: |
| // TimerTask() {} |
| // private: |
| // bool Run() override { |
| // ++count_; |
| // TaskQueue::Current()->PostDelayedTask( |
| // std::unique_ptr<QueuedTask>(this), 1000); |
| // // Ownership has been transferred to the next occurance, |
| // // so return false to prevent from being deleted now. |
| // return false; |
| // } |
| // int count_ = 0; |
| // }; |
| // ... |
| // queue_.PostDelayedTask( |
| // std::unique_ptr<QueuedTask>(new TimerTask()), 1000); |
| // |
| // For more examples, see task_queue_unittests.cc. |
| // |
| // A note on destruction: |
| // |
| // When a TaskQueue is deleted, pending tasks will not be executed but they will |
| // be deleted. The deletion of tasks may happen asynchronously after the |
| // TaskQueue itself has been deleted or it may happen synchronously while the |
| // TaskQueue instance is being deleted. This may vary from one OS to the next |
| // so assumptions about lifetimes of pending tasks should not be made. |
| class LOCKABLE TaskQueue { |
| public: |
| explicit TaskQueue(const char* queue_name); |
| // TODO(tommi): Implement move semantics? |
| ~TaskQueue(); |
| |
| static TaskQueue* Current(); |
| |
| // Used for DCHECKing the current queue. |
| static bool IsCurrent(const char* queue_name); |
| bool IsCurrent() const; |
| |
| // TODO(tommi): For better debuggability, implement RTC_FROM_HERE. |
| |
| // Ownership of the task is passed to PostTask. |
| void PostTask(std::unique_ptr<QueuedTask> task); |
| void PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| std::unique_ptr<QueuedTask> reply, |
| TaskQueue* reply_queue); |
| void PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| std::unique_ptr<QueuedTask> reply); |
| |
| void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds); |
| |
| template <class Closure> |
| void PostTask(const Closure& closure) { |
| PostTask(std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure))); |
| } |
| |
| template <class Closure> |
| void PostDelayedTask(const Closure& closure, uint32_t milliseconds) { |
| PostDelayedTask( |
| std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(closure)), |
| milliseconds); |
| } |
| |
| template <class Closure1, class Closure2> |
| void PostTaskAndReply(const Closure1& task, |
| const Closure2& reply, |
| TaskQueue* reply_queue) { |
| PostTaskAndReply( |
| std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)), |
| std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply)), |
| reply_queue); |
| } |
| |
| template <class Closure> |
| void PostTaskAndReply(std::unique_ptr<QueuedTask> task, |
| const Closure& reply) { |
| PostTaskAndReply(std::move(task), std::unique_ptr<QueuedTask>( |
| new ClosureTask<Closure>(reply))); |
| } |
| |
| template <class Closure> |
| void PostTaskAndReply(const Closure& task, |
| std::unique_ptr<QueuedTask> reply) { |
| PostTaskAndReply( |
| std::unique_ptr<QueuedTask>(new ClosureTask<Closure>(task)), |
| std::move(reply)); |
| } |
| |
| template <class Closure1, class Closure2> |
| void PostTaskAndReply(const Closure1& task, const Closure2& reply) { |
| PostTaskAndReply( |
| std::unique_ptr<QueuedTask>(new ClosureTask<Closure1>(task)), |
| std::unique_ptr<QueuedTask>(new ClosureTask<Closure2>(reply))); |
| } |
| |
| private: |
| #if defined(WEBRTC_BUILD_LIBEVENT) |
| static bool ThreadMain(void* context); |
| static void OnWakeup(int socket, short flags, void* context); // NOLINT |
| static void RunTask(int fd, short flags, void* context); // NOLINT |
| static void RunTimer(int fd, short flags, void* context); // NOLINT |
| |
| class PostAndReplyTask; |
| class SetTimerTask; |
| |
| void PrepareReplyTask(PostAndReplyTask* reply_task); |
| void ReplyTaskDone(PostAndReplyTask* reply_task); |
| |
| struct QueueContext; |
| |
| int wakeup_pipe_in_ = -1; |
| int wakeup_pipe_out_ = -1; |
| event_base* event_base_; |
| std::unique_ptr<event> wakeup_event_; |
| PlatformThread thread_; |
| rtc::CriticalSection pending_lock_; |
| std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_); |
| std::list<PostAndReplyTask*> pending_replies_ GUARDED_BY(pending_lock_); |
| #elif defined(WEBRTC_MAC) |
| struct QueueContext; |
| struct TaskContext; |
| struct PostTaskAndReplyContext; |
| dispatch_queue_t queue_; |
| QueueContext* const context_; |
| #elif defined(WEBRTC_WIN) |
| typedef std::unordered_map<UINT_PTR, std::unique_ptr<QueuedTask>> |
| DelayedTasks; |
| static bool ThreadMain(void* context); |
| static bool ProcessQueuedMessages(DelayedTasks* delayed_tasks); |
| |
| class WorkerThread : public PlatformThread { |
| public: |
| WorkerThread(ThreadRunFunction func, void* obj, const char* thread_name) |
| : PlatformThread(func, obj, thread_name) {} |
| |
| bool QueueAPC(PAPCFUNC apc_function, ULONG_PTR data) { |
| return PlatformThread::QueueAPC(apc_function, data); |
| } |
| }; |
| WorkerThread thread_; |
| #else |
| #error not supported. |
| #endif |
| |
| RTC_DISALLOW_COPY_AND_ASSIGN(TaskQueue); |
| }; |
| |
| } // namespace rtc |
| |
| #endif // WEBRTC_BASE_TASK_QUEUE_H_ |