Update TaskQueueLibevent implementation to absl::AnyInvocable

Bug: webrtc:14245, webrtc:12889
Change-Id: I1aa20e3d5645c270abd1bee0c45c6982e799eaa4
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268767
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37563}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 322fd7a..546a6d6 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -645,10 +645,12 @@
       ":safe_conversions",
       ":timeutils",
       "../api/task_queue",
+      "../api/units:time_delta",
       "synchronization:mutex",
     ]
     absl_deps = [
       "//third_party/abseil-cpp/absl/container:inlined_vector",
+      "//third_party/abseil-cpp/absl/functional:any_invocable",
       "//third_party/abseil-cpp/absl/strings",
     ]
     if (rtc_build_libevent) {
diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc
index ba80a64..f50e5a6 100644
--- a/rtc_base/task_queue_libevent.cc
+++ b/rtc_base/task_queue_libevent.cc
@@ -24,9 +24,10 @@
 #include <utility>
 
 #include "absl/container/inlined_vector.h"
+#include "absl/functional/any_invocable.h"
 #include "absl/strings/string_view.h"
-#include "api/task_queue/queued_task.h"
 #include "api/task_queue/task_queue_base.h"
+#include "api/units/time_delta.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/logging.h"
 #include "rtc_base/numerics/safe_conversions.h"
@@ -106,14 +107,18 @@
   TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
 
   void Delete() override;
-  void PostTask(std::unique_ptr<QueuedTask> task) override;
-  void PostDelayedTask(std::unique_ptr<QueuedTask> task,
-                       uint32_t milliseconds) override;
+  void PostTask(absl::AnyInvocable<void() &&> task) override;
+  void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+                       TimeDelta delay) override;
+  void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+                                    TimeDelta delay) override;
 
  private:
-  class SetTimerTask;
   struct TimerEvent;
 
+  void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
+                                  TimeDelta delay);
+
   ~TaskQueueLibevent() override = default;
 
   static void OnWakeup(int socket, short flags, void* context);  // NOLINT
@@ -126,43 +131,20 @@
   event wakeup_event_;
   rtc::PlatformThread thread_;
   Mutex pending_lock_;
-  absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_
+  absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
       RTC_GUARDED_BY(pending_lock_);
   // Holds a list of events pending timers for cleanup when the loop exits.
   std::list<TimerEvent*> pending_timers_;
 };
 
 struct TaskQueueLibevent::TimerEvent {
-  TimerEvent(TaskQueueLibevent* task_queue, std::unique_ptr<QueuedTask> task)
+  TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
       : task_queue(task_queue), task(std::move(task)) {}
   ~TimerEvent() { event_del(&ev); }
 
   event ev;
   TaskQueueLibevent* task_queue;
-  std::unique_ptr<QueuedTask> task;
-};
-
-class TaskQueueLibevent::SetTimerTask : public QueuedTask {
- public:
-  SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
-      : task_(std::move(task)),
-        milliseconds_(milliseconds),
-        posted_(rtc::Time32()) {}
-
- private:
-  bool Run() override {
-    // Compensate for the time that has passed since construction
-    // and until we got here.
-    uint32_t post_time = rtc::Time32() - posted_;
-    TaskQueueLibevent::Current()->PostDelayedTask(
-        std::move(task_),
-        post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
-    return true;
-  }
-
-  std::unique_ptr<QueuedTask> task_;
-  const uint32_t milliseconds_;
-  const uint32_t posted_;
+  absl::AnyInvocable<void() &&> task;
 };
 
 TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
@@ -219,7 +201,7 @@
   delete this;
 }
 
-void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueueLibevent::PostTask(absl::AnyInvocable<void() &&> task) {
   {
     MutexLock lock(&pending_lock_);
     bool had_pending_tasks = !pending_.empty();
@@ -242,21 +224,43 @@
                sizeof(message));
 }
 
-void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
-                                        uint32_t milliseconds) {
+void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
+    absl::AnyInvocable<void() &&> task,
+    TimeDelta delay) {
+  // libevent api is not thread safe by default, thus event_add need to be
+  // called on the `thread_`.
+  RTC_DCHECK(IsCurrent());
+
+  TimerEvent* timer = new TimerEvent(this, std::move(task));
+  EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
+              timer);
+  pending_timers_.push_back(timer);
+  timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
+                .tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
+  event_add(&timer->ev, &tv);
+}
+
+void TaskQueueLibevent::PostDelayedTask(absl::AnyInvocable<void() &&> task,
+                                        TimeDelta delay) {
   if (IsCurrent()) {
-    TimerEvent* timer = new TimerEvent(this, std::move(task));
-    EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
-                timer);
-    pending_timers_.push_back(timer);
-    timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
-                  rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
-    event_add(&timer->ev, &tv);
+    PostDelayedTaskOnTaskQueue(std::move(task), delay);
   } else {
-    PostTask(std::make_unique<SetTimerTask>(std::move(task), milliseconds));
+    int64_t posted_us = rtc::TimeMicros();
+    PostTask([posted_us, delay, task = std::move(task), this]() mutable {
+      // Compensate for the time that has passed since the posting.
+      TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
+      PostDelayedTaskOnTaskQueue(
+          std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
+    });
   }
 }
 
+void TaskQueueLibevent::PostDelayedHighPrecisionTask(
+    absl::AnyInvocable<void() &&> task,
+    TimeDelta delay) {
+  PostDelayedTask(std::move(task), delay);
+}
+
 // static
 void TaskQueueLibevent::OnWakeup(int socket,
                                  short flags,  // NOLINT
@@ -271,19 +275,16 @@
       event_base_loopbreak(me->event_base_);
       break;
     case kRunTasks: {
-      absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;
+      absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
       {
         MutexLock lock(&me->pending_lock_);
         tasks.swap(me->pending_);
       }
       RTC_DCHECK(!tasks.empty());
       for (auto& task : tasks) {
-        if (task->Run()) {
-          task.reset();
-        } else {
-          // `false` means the task should *not* be deleted.
-          task.release();
-        }
+        std::move(task)();
+        // Prefer to delete the `task` before running the next one.
+        task = nullptr;
       }
       break;
     }
@@ -298,8 +299,7 @@
                                  short flags,  // NOLINT
                                  void* context) {
   TimerEvent* timer = static_cast<TimerEvent*>(context);
-  if (!timer->task->Run())
-    timer->task.release();
+  std::move(timer->task)();
   timer->task_queue->pending_timers_.remove(timer);
   delete timer;
 }