Batch process pending tasks in the libevent TaskQueue

This change improves performance under high load by processing
all pending tasks each time the thread is woken up by libevent.

Additionally, the pipe used to wake up the TaskQueue thread now
not be written to if there's already a pending write on the pipe.
This fixes a bug where under high load the pipe write buffer can
fill and cause tasks to get dropped.

Bug: webrtc:11259, webrtc:8876
Change-Id: Ic82978c71bf9e9a25f281ca4775d46168d161d4e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/165420
Commit-Queue: Steve Anton <steveanton@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30202}
diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc
index 7638869..349a5f2 100644
--- a/rtc_base/task_queue_libevent.cc
+++ b/rtc_base/task_queue_libevent.cc
@@ -23,6 +23,7 @@
 #include <type_traits>
 #include <utility>
 
+#include "absl/container/inlined_vector.h"
 #include "absl/strings/string_view.h"
 #include "api/task_queue/queued_task.h"
 #include "api/task_queue/task_queue_base.h"
@@ -39,7 +40,7 @@
 namespace webrtc {
 namespace {
 constexpr char kQuit = 1;
-constexpr char kRunTask = 2;
+constexpr char kRunTasks = 2;
 
 using Priority = TaskQueueFactory::Priority;
 
@@ -130,7 +131,8 @@
   event wakeup_event_;
   rtc::PlatformThread thread_;
   rtc::CriticalSection pending_lock_;
-  std::list<std::unique_ptr<QueuedTask>> pending_ RTC_GUARDED_BY(pending_lock_);
+  absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_
+      RTC_GUARDED_BY(pending_lock_);
   // Holds a list of events pending timers for cleanup when the loop exits.
   std::list<TimerEvent*> pending_timers_;
 };
@@ -213,19 +215,26 @@
 }
 
 void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
-  QueuedTask* task_id = task.get();  // Only used for comparison.
   {
     rtc::CritScope lock(&pending_lock_);
+    bool had_pending_tasks = !pending_.empty();
     pending_.push_back(std::move(task));
+
+    // Only write to the pipe if there were no pending tasks before this one
+    // since the thread could be sleeping. If there were already pending tasks
+    // then we know there's either a pending write in the pipe or the thread has
+    // not yet processed the pending tasks. In either case, the thread will
+    // eventually wake up and process all pending tasks including this one.
+    if (had_pending_tasks) {
+      return;
+    }
   }
-  char message = kRunTask;
-  if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
-    RTC_LOG(WARNING) << "Failed to queue task.";
-    rtc::CritScope lock(&pending_lock_);
-    pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
-      return t.get() == task_id;
-    });
-  }
+
+  // Note: This behvior outlined above ensures we never fill up the pipe write
+  // buffer since there will only ever be 1 byte pending.
+  char message = kRunTasks;
+  RTC_CHECK_EQ(write(wakeup_pipe_in_, &message, sizeof(message)),
+               sizeof(message));
 }
 
 void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
@@ -270,17 +279,21 @@
       me->is_active_ = false;
       event_base_loopbreak(me->event_base_);
       break;
-    case kRunTask: {
-      std::unique_ptr<QueuedTask> task;
+    case kRunTasks: {
+      absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;
       {
         rtc::CritScope lock(&me->pending_lock_);
-        RTC_DCHECK(!me->pending_.empty());
-        task = std::move(me->pending_.front());
-        me->pending_.pop_front();
-        RTC_DCHECK(task.get());
+        tasks.swap(me->pending_);
       }
-      if (!task->Run())
-        task.release();
+      RTC_DCHECK(!tasks.empty());
+      for (auto& task : tasks) {
+        if (task->Run()) {
+          task.reset();
+        } else {
+          // |false| means the task should *not* be deleted.
+          task.release();
+        }
+      }
       break;
     }
     default: