Update TaskQueueStdlib implementation to absl::AnyInvocable

Bug: webrtc:14245
Change-Id: Ic0c55cbb4dbdd31359bbe15f1acd7a2b7e9e61f7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268901
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37568}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index c183f5f..528f124 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -712,6 +712,7 @@
   ]
   deps = [
     ":checks",
+    ":divide_round",
     ":logging",
     ":macromagic",
     ":platform_thread",
@@ -719,9 +720,13 @@
     ":safe_conversions",
     ":timeutils",
     "../api/task_queue",
+    "../api/units:time_delta",
     "synchronization:mutex",
   ]
-  absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
+  absl_deps = [
+    "//third_party/abseil-cpp/absl/functional:any_invocable",
+    "//third_party/abseil-cpp/absl/strings",
+  ]
 }
 
 rtc_library("weak_ptr") {
diff --git a/rtc_base/task_queue_stdlib.cc b/rtc_base/task_queue_stdlib.cc
index 946261e..19e7dea 100644
--- a/rtc_base/task_queue_stdlib.cc
+++ b/rtc_base/task_queue_stdlib.cc
@@ -18,12 +18,14 @@
 #include <queue>
 #include <utility>
 
+#include "absl/functional/any_invocable.h"
 #include "absl/strings/string_view.h"
-#include "api/task_queue/queued_task.h"
 #include "api/task_queue/task_queue_base.h"
+#include "api/units/time_delta.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/event.h"
 #include "rtc_base/logging.h"
+#include "rtc_base/numerics/divide_round.h"
 #include "rtc_base/platform_thread.h"
 #include "rtc_base/synchronization/mutex.h"
 #include "rtc_base/thread_annotations.h"
@@ -50,27 +52,29 @@
   ~TaskQueueStdlib() override = default;
 
   void Delete() override;
-  void PostTask(std::unique_ptr<QueuedTask> task) override;
-  void PostDelayedTask(std::unique_ptr<QueuedTask> task,
-                       uint32_t milliseconds) override;
+  void PostTask(absl::AnyInvocable<void() &&> task) override;
+  void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+                       TimeDelta delay) override;
+  void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+                                    TimeDelta delay) override;
 
  private:
   using OrderId = uint64_t;
 
   struct DelayedEntryTimeout {
-    int64_t next_fire_at_ms_{};
-    OrderId order_{};
+    int64_t next_fire_at_us{};
+    OrderId order{};
 
     bool operator<(const DelayedEntryTimeout& o) const {
-      return std::tie(next_fire_at_ms_, order_) <
-             std::tie(o.next_fire_at_ms_, o.order_);
+      return std::tie(next_fire_at_us, order) <
+             std::tie(o.next_fire_at_us, o.order);
     }
   };
 
   struct NextTask {
-    bool final_task_ = false;
-    std::unique_ptr<QueuedTask> run_task_;
-    int64_t sleep_time_ms_ = 0;
+    bool final_task = false;
+    absl::AnyInvocable<void() &&> run_task;
+    int64_t sleep_time_ms = rtc::Event::kForever;
   };
 
   static rtc::PlatformThread InitializeThread(TaskQueueStdlib* me,
@@ -97,7 +101,7 @@
 
   // The list of all pending tasks that need to be processed in the
   // FIFO queue ordering on the worker thread.
-  std::queue<std::pair<OrderId, std::unique_ptr<QueuedTask>>> pending_queue_
+  std::queue<std::pair<OrderId, absl::AnyInvocable<void() &&>>> pending_queue_
       RTC_GUARDED_BY(pending_lock_);
 
   // The list of all pending tasks that need to be processed at a future
@@ -105,8 +109,8 @@
   // happen at exactly the same time interval as another task then the
   // task is processed based on FIFO ordering. std::priority_queue was
   // considered but rejected due to its inability to extract the
-  // std::unique_ptr out of the queue without the presence of a hack.
-  std::map<DelayedEntryTimeout, std::unique_ptr<QueuedTask>> delayed_queue_
+  // move-only value out of the queue without the presence of a hack.
+  std::map<DelayedEntryTimeout, absl::AnyInvocable<void() &&>> delayed_queue_
       RTC_GUARDED_BY(pending_lock_);
 
   // Contains the active worker thread assigned to processing
@@ -151,43 +155,45 @@
   delete this;
 }
 
-void TaskQueueStdlib::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueueStdlib::PostTask(absl::AnyInvocable<void() &&> task) {
   {
     MutexLock lock(&pending_lock_);
-    OrderId order = thread_posting_order_++;
-
-    pending_queue_.push(std::pair<OrderId, std::unique_ptr<QueuedTask>>(
-        order, std::move(task)));
+    pending_queue_.push(
+        std::make_pair(++thread_posting_order_, std::move(task)));
   }
 
   NotifyWake();
 }
 
-void TaskQueueStdlib::PostDelayedTask(std::unique_ptr<QueuedTask> task,
-                                      uint32_t milliseconds) {
-  const auto fire_at = rtc::TimeMillis() + milliseconds;
-
-  DelayedEntryTimeout delay;
-  delay.next_fire_at_ms_ = fire_at;
+void TaskQueueStdlib::PostDelayedTask(absl::AnyInvocable<void() &&> task,
+                                      TimeDelta delay) {
+  DelayedEntryTimeout delayed_entry;
+  delayed_entry.next_fire_at_us = rtc::TimeMicros() + delay.us();
 
   {
     MutexLock lock(&pending_lock_);
-    delay.order_ = ++thread_posting_order_;
-    delayed_queue_[delay] = std::move(task);
+    delayed_entry.order = ++thread_posting_order_;
+    delayed_queue_[delayed_entry] = std::move(task);
   }
 
   NotifyWake();
 }
 
+void TaskQueueStdlib::PostDelayedHighPrecisionTask(
+    absl::AnyInvocable<void() &&> task,
+    TimeDelta delay) {
+  PostDelayedTask(std::move(task), delay);
+}
+
 TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
   NextTask result;
 
-  const auto tick = rtc::TimeMillis();
+  const int64_t tick_us = rtc::TimeMicros();
 
   MutexLock lock(&pending_lock_);
 
   if (thread_should_quit_) {
-    result.final_task_ = true;
+    result.final_task = true;
     return result;
   }
 
@@ -195,29 +201,30 @@
     auto delayed_entry = delayed_queue_.begin();
     const auto& delay_info = delayed_entry->first;
     auto& delay_run = delayed_entry->second;
-    if (tick >= delay_info.next_fire_at_ms_) {
+    if (tick_us >= delay_info.next_fire_at_us) {
       if (pending_queue_.size() > 0) {
         auto& entry = pending_queue_.front();
         auto& entry_order = entry.first;
         auto& entry_run = entry.second;
-        if (entry_order < delay_info.order_) {
-          result.run_task_ = std::move(entry_run);
+        if (entry_order < delay_info.order) {
+          result.run_task = std::move(entry_run);
           pending_queue_.pop();
           return result;
         }
       }
 
-      result.run_task_ = std::move(delay_run);
+      result.run_task = std::move(delay_run);
       delayed_queue_.erase(delayed_entry);
       return result;
     }
 
-    result.sleep_time_ms_ = delay_info.next_fire_at_ms_ - tick;
+    result.sleep_time_ms =
+        DivideRoundUp(delay_info.next_fire_at_us - tick_us, 1'000);
   }
 
   if (pending_queue_.size() > 0) {
     auto& entry = pending_queue_.front();
-    result.run_task_ = std::move(entry.second);
+    result.run_task = std::move(entry.second);
     pending_queue_.pop();
   }
 
@@ -228,21 +235,18 @@
   while (true) {
     auto task = GetNextTask();
 
-    if (task.final_task_)
+    if (task.final_task)
       break;
 
-    if (task.run_task_) {
+    if (task.run_task) {
       // process entry immediately then try again
-      QueuedTask* release_ptr = task.run_task_.release();
-      if (release_ptr->Run())
-        delete release_ptr;
+      std::move(task.run_task)();
 
       // Attempt to run more tasks before going to sleep.
       continue;
     }
 
-    flag_notify_.Wait(0 == task.sleep_time_ms_ ? rtc::Event::kForever
-                                               : task.sleep_time_ms_);
+    flag_notify_.Wait(task.sleep_time_ms);
   }
 }