Fix TaskQueueLibevent::PostTask when used on the same TaskQueue
Stop using event_base_once because it doesn't guarantee to free QueuedTask when task not run and thus may break TaskQueue guarantee all posted tasks are eventually deleted
Bug: webrtc:10731, webrtc:10278
Change-Id: Id073a6092cf603cac5768da7a0770371053b20cc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/141420
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28241}
diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc
index 38704c8..9a01f46 100644
--- a/rtc_base/task_queue_libevent.cc
+++ b/rtc_base/task_queue_libevent.cc
@@ -121,7 +121,6 @@
static void ThreadMain(void* context);
static void OnWakeup(int socket, short flags, void* context); // NOLINT
- static void RunTask(int fd, short flags, void* context); // NOLINT
static void RunTimer(int fd, short flags, void* context); // NOLINT
bool is_active_ = true;
@@ -214,30 +213,18 @@
}
void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
- RTC_DCHECK(task.get());
- // libevent isn't thread safe. This means that we can't use methods such
- // as event_base_once to post tasks to the worker thread from a different
- // thread. However, we can use it when posting from the worker thread itself.
- if (IsCurrent()) {
- if (event_base_once(event_base_, -1, EV_TIMEOUT,
- &TaskQueueLibevent::RunTask, task.get(),
- nullptr) == 0) {
- task.release();
- }
- } else {
- QueuedTask* task_id = task.get(); // Only used for comparison.
- {
- rtc::CritScope lock(&pending_lock_);
- pending_.push_back(std::move(task));
- }
- char message = kRunTask;
- if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
- RTC_LOG(WARNING) << "Failed to queue task.";
- rtc::CritScope lock(&pending_lock_);
- pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
- return t.get() == task_id;
- });
- }
+ QueuedTask* task_id = task.get(); // Only used for comparison.
+ {
+ rtc::CritScope lock(&pending_lock_);
+ pending_.push_back(std::move(task));
+ }
+ char message = kRunTask;
+ if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
+ RTC_LOG(WARNING) << "Failed to queue task.";
+ rtc::CritScope lock(&pending_lock_);
+ pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
+ return t.get() == task_id;
+ });
}
}
@@ -303,13 +290,6 @@
}
// static
-void TaskQueueLibevent::RunTask(int fd, short flags, void* context) { // NOLINT
- auto* task = static_cast<QueuedTask*>(context);
- if (task->Run())
- delete task;
-}
-
-// static
void TaskQueueLibevent::RunTimer(int fd,
short flags, // NOLINT
void* context) {