In rtc::Thread implement posting AnyInvocable

Lots of code call rtc::Thread directly instead of through TaskQueueBase
interface, thus to continue migration step by step rtc::Thread needs
to implement both old and new TaskQueueBase interfaces.

Bug: webrtc:14245
Change-Id: Ie7cac897a4c8a6227b8d467a39adb30aec6f1318
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/267984
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37474}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 7344695..e259fc8 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -914,6 +914,8 @@
   absl_deps = [
     "//third_party/abseil-cpp/absl/algorithm:container",
     "//third_party/abseil-cpp/absl/base:core_headers",
+    "//third_party/abseil-cpp/absl/cleanup",
+    "//third_party/abseil-cpp/absl/functional:any_invocable",
     "//third_party/abseil-cpp/absl/strings",
   ]
   deps = [
@@ -943,6 +945,7 @@
     "../api/task_queue",
     "../api/task_queue:pending_task_safety_flag",
     "../api/task_queue:to_queued_task",
+    "../api/units:time_delta",
     "synchronization:mutex",
     "system:no_unique_address",
     "system:rtc_export",
@@ -1730,6 +1733,7 @@
         "../api/task_queue:pending_task_safety_flag",
         "../api/task_queue:task_queue_test",
         "../api/task_queue:to_queued_task",
+        "../api/units:time_delta",
         "../test:field_trial",
         "../test:fileutils",
         "../test:rtc_expect_death",
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index e1db2bc..ef165ff 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -31,8 +31,8 @@
 #include <utility>
 
 #include "absl/algorithm/container.h"
+#include "absl/cleanup/cleanup.h"
 #include "api/sequence_checker.h"
-#include "api/task_queue/to_queued_task.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/deprecated/recursive_critical_section.h"
 #include "rtc_base/event.h"
@@ -72,22 +72,25 @@
 namespace rtc {
 namespace {
 
-class MessageHandlerWithTask final : public MessageHandler {
+struct AnyInvocableMessage final : public MessageData {
+  explicit AnyInvocableMessage(absl::AnyInvocable<void() &&> task)
+      : task(std::move(task)) {}
+  absl::AnyInvocable<void() &&> task;
+};
+
+class AnyInvocableMessageHandler final : public MessageHandler {
  public:
-  MessageHandlerWithTask() {}
-
-  MessageHandlerWithTask(const MessageHandlerWithTask&) = delete;
-  MessageHandlerWithTask& operator=(const MessageHandlerWithTask&) = delete;
-
   void OnMessage(Message* msg) override {
-    static_cast<rtc_thread_internal::MessageLikeTask*>(msg->pdata)->Run();
+    std::move(static_cast<AnyInvocableMessage*>(msg->pdata)->task)();
     delete msg->pdata;
   }
-
- private:
-  ~MessageHandlerWithTask() override {}
 };
 
+MessageHandler* GetAnyInvocableMessageHandler() {
+  static MessageHandler* const handler = new AnyInvocableMessageHandler;
+  return handler;
+}
+
 class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
  public:
   MarkProcessingCritScope(const RecursiveCriticalSection* cs,
@@ -761,8 +764,7 @@
 
 void Thread::SetDispatchWarningMs(int deadline) {
   if (!IsCurrent()) {
-    PostTask(webrtc::ToQueuedTask(
-        [this, deadline]() { SetDispatchWarningMs(deadline); }));
+    PostTask([this, deadline]() { SetDispatchWarningMs(deadline); });
     return;
   }
   RTC_DCHECK_RUN_ON(this);
@@ -948,18 +950,19 @@
     done_event.reset(new rtc::Event());
 
   bool ready = false;
-  PostTask(webrtc::ToQueuedTask(
-      [&msg]() mutable { msg.phandler->OnMessage(&msg); },
-      [this, &ready, current_thread, done = done_event.get()] {
-        if (current_thread) {
-          CritScope cs(&crit_);
-          ready = true;
-          current_thread->socketserver()->WakeUp();
-        } else {
-          done->Set();
-        }
-      }));
-
+  absl::Cleanup cleanup = [this, &ready, current_thread,
+                           done = done_event.get()] {
+    if (current_thread) {
+      CritScope cs(&crit_);
+      ready = true;
+      current_thread->socketserver()->WakeUp();
+    } else {
+      done->Set();
+    }
+  };
+  PostTask([&msg, cleanup = std::move(cleanup)]() mutable {
+    msg.phandler->OnMessage(&msg);
+  });
   if (current_thread) {
     bool waited = false;
     crit_.Enter();
@@ -1115,6 +1118,28 @@
   delete this;
 }
 
+void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
+  // Though Post takes MessageData by raw pointer (last parameter), it still
+  // takes it with ownership.
+  Post(RTC_FROM_HERE, GetAnyInvocableMessageHandler(),
+       /*id=*/0, new AnyInvocableMessage(std::move(task)));
+}
+
+void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
+                             webrtc::TimeDelta delay) {
+  // This implementation does not support low precision yet.
+  PostDelayedHighPrecisionTask(std::move(task), delay);
+}
+
+void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+                                          webrtc::TimeDelta delay) {
+  int delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
+  // Though PostDelayed takes MessageData by raw pointer (last parameter),
+  // it still takes it with ownership.
+  PostDelayed(RTC_FROM_HERE, delay_ms, GetAnyInvocableMessageHandler(),
+              /*id=*/0, new AnyInvocableMessage(std::move(task)));
+}
+
 bool Thread::IsProcessingMessagesForTesting() {
   return (owned_ || IsCurrent()) && !IsQuitting();
 }
@@ -1183,13 +1208,6 @@
 #endif
 }
 
-// static
-MessageHandler* Thread::GetPostTaskMessageHandler() {
-  // Allocate at first call, never deallocate.
-  static MessageHandler* handler = new MessageHandlerWithTask;
-  return handler;
-}
-
 AutoThread::AutoThread()
     : Thread(CreateDefaultSocketServer(), /*do_init=*/false) {
   if (!ThreadManager::Instance()->CurrentThread()) {
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index 3364684..e87248c 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -28,10 +28,12 @@
 #include <pthread.h>
 #endif
 #include "absl/base/attributes.h"
+#include "absl/functional/any_invocable.h"
 #include "api/function_view.h"
 #include "api/task_queue/queued_task.h"
 #include "api/task_queue/task_queue_base.h"
 #include "api/task_queue/to_queued_task.h"
+#include "api/units/time_delta.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/deprecated/recursive_critical_section.h"
 #include "rtc_base/location.h"
@@ -79,32 +81,6 @@
 
 class Thread;
 
-namespace rtc_thread_internal {
-
-class MessageLikeTask : public MessageData {
- public:
-  virtual void Run() = 0;
-};
-
-template <class FunctorT>
-class MessageWithFunctor final : public MessageLikeTask {
- public:
-  explicit MessageWithFunctor(FunctorT&& functor)
-      : functor_(std::forward<FunctorT>(functor)) {}
-
-  MessageWithFunctor(const MessageWithFunctor&) = delete;
-  MessageWithFunctor& operator=(const MessageWithFunctor&) = delete;
-
-  void Run() override { functor_(); }
-
- private:
-  ~MessageWithFunctor() override {}
-
-  typename std::remove_reference<FunctorT>::type functor_;
-};
-
-}  // namespace rtc_thread_internal
-
 class RTC_EXPORT ThreadManager {
  public:
   static const int kForever = -1;
@@ -418,36 +394,29 @@
   bool IsInvokeToThreadAllowed(rtc::Thread* target);
 
   // From TaskQueueBase
+  void Delete() override;
+  void PostTask(absl::AnyInvocable<void() &&> task) override;
+  void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+                       webrtc::TimeDelta delay) override;
+  void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+                                    webrtc::TimeDelta delay) override;
+
+  // Legacy TaskQueueBase methods, do not use in new code.
+  // TODO(bugs.webrtc.org/14245): Delete when all code that use rtc::Thread
+  // directly is updated to use PostTask methods above.
   void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override;
   void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
                        uint32_t milliseconds) override;
   void PostDelayedHighPrecisionTask(std::unique_ptr<webrtc::QueuedTask> task,
                                     uint32_t milliseconds) override;
-  void Delete() override;
 
-  // Helper methods to avoid having to do ToQueuedTask() at the calling places.
-  template <class Closure,
-            typename std::enable_if<!std::is_convertible<
-                Closure,
-                std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
-  void PostTask(Closure&& closure) {
-    PostTask(webrtc::ToQueuedTask(std::forward<Closure>(closure)));
-  }
-  template <class Closure,
-            typename std::enable_if<!std::is_convertible<
-                Closure,
-                std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
-  void PostDelayedTask(Closure&& closure, uint32_t milliseconds) {
-    PostDelayedTask(webrtc::ToQueuedTask(std::forward<Closure>(closure)),
-                    milliseconds);
-  }
-  template <class Closure,
-            typename std::enable_if<!std::is_convertible<
-                Closure,
-                std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
-  void PostDelayedHighPrecisionTask(Closure&& closure, uint32_t milliseconds) {
-    PostDelayedHighPrecisionTask(
-        webrtc::ToQueuedTask(std::forward<Closure>(closure)), milliseconds);
+  // Legacy helper method, do not use in new code.
+  // TODO(bugs.webrtc.org/14245): Delete when all code that use rtc::Thread
+  // directly is updated to use PostTask methods above.
+  ABSL_DEPRECATED("Pass delay as webrtc::TimeDelta type")
+  void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+                       uint32_t milliseconds) {
+    PostDelayedTask(std::move(task), webrtc::TimeDelta::Millis(milliseconds));
   }
 
   // ProcessMessages will process I/O and dispatch messages until:
@@ -609,10 +578,6 @@
   // Called by the ThreadManager when being unset as the current thread.
   void ClearCurrentTaskQueue();
 
-  // Returns a static-lifetime MessageHandler which runs message with
-  // MessageLikeTask payload data.
-  static MessageHandler* GetPostTaskMessageHandler();
-
   bool fPeekKeep_;
   Message msgPeek_;
   MessageList messages_ RTC_GUARDED_BY(crit_);
diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc
index 321cbc3..7fcf7ca 100644
--- a/rtc_base/thread_unittest.cc
+++ b/rtc_base/thread_unittest.cc
@@ -14,7 +14,7 @@
 
 #include "api/task_queue/task_queue_factory.h"
 #include "api/task_queue/task_queue_test.h"
-#include "api/task_queue/to_queued_task.h"
+#include "api/units/time_delta.h"
 #include "rtc_base/async_invoker.h"
 #include "rtc_base/async_udp_socket.h"
 #include "rtc_base/checks.h"
@@ -36,7 +36,7 @@
 namespace rtc {
 namespace {
 
-using ::webrtc::ToQueuedTask;
+using ::webrtc::TimeDelta;
 
 // Generates a sequence of numbers (collaboratively).
 class TestGenerator {
@@ -373,8 +373,8 @@
   auto thread1 = Thread::CreateWithSocketServer();
   auto thread2 = Thread::CreateWithSocketServer();
 
-  thread1->PostTask(ToQueuedTask(
-      [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
+  thread1->PostTask(
+      [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); });
   main_thread.ProcessMessages(100);
 }
 
@@ -389,11 +389,11 @@
   thread1->AllowInvokesToThread(thread2.get());
   thread1->AllowInvokesToThread(thread3.get());
 
-  thread1->PostTask(ToQueuedTask([&]() {
+  thread1->PostTask([&]() {
     EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get()));
     EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get()));
     EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get()));
-  }));
+  });
   main_thread.ProcessMessages(100);
 }
 
@@ -405,9 +405,8 @@
 
   thread1->DisallowAllInvokes();
 
-  thread1->PostTask(ToQueuedTask([&]() {
-    EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get()));
-  }));
+  thread1->PostTask(
+      [&]() { EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get())); });
   main_thread.ProcessMessages(100);
 }
 #endif  // (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
@@ -418,8 +417,8 @@
   auto thread1 = Thread::CreateWithSocketServer();
   auto thread2 = Thread::CreateWithSocketServer();
 
-  thread1->PostTask(ToQueuedTask(
-      [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
+  thread1->PostTask(
+      [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); });
   main_thread.ProcessMessages(100);
 }
 
@@ -672,11 +671,11 @@
   };
 
   // Post messages (both delayed and non delayed) to both threads.
-  a->PostTask(ToQueuedTask(incrementer));
-  b->PostTask(ToQueuedTask(incrementer));
-  a->PostDelayedTask(ToQueuedTask(incrementer), 0);
-  b->PostDelayedTask(ToQueuedTask(incrementer), 0);
-  main_thread.PostTask(ToQueuedTask(event_signaler));
+  a->PostTask(incrementer);
+  b->PostTask(incrementer);
+  a->PostDelayedTask(incrementer, TimeDelta::Zero());
+  b->PostDelayedTask(incrementer, TimeDelta::Zero());
+  main_thread.PostTask(event_signaler);
 
   ThreadManager::ProcessAllMessageQueuesForTesting();
   EXPECT_EQ(4, messages_processed.load(std::memory_order_acquire));
@@ -1083,7 +1082,7 @@
         WaitAndSetEvent(&event_set_by_test_thread,
                         &event_set_by_background_thread);
       },
-      /*milliseconds=*/10);
+      TimeDelta::Millis(10));
   event_set_by_test_thread.Set();
   event_set_by_background_thread.Wait(Event::kForever);
 }
@@ -1100,18 +1099,18 @@
 
   background_thread->PostDelayedTask(
       [&third, &fourth] { WaitAndSetEvent(&third, &fourth); },
-      /*milliseconds=*/11);
+      TimeDelta::Millis(11));
   background_thread->PostDelayedTask(
       [&first, &second] { WaitAndSetEvent(&first, &second); },
-      /*milliseconds=*/9);
+      TimeDelta::Millis(9));
   background_thread->PostDelayedTask(
       [&second, &third] { WaitAndSetEvent(&second, &third); },
-      /*milliseconds=*/10);
+      TimeDelta::Millis(10));
 
   // All tasks have been posted before the first one is unblocked.
   first.Set();
   // Only if the chain is invoked in delay order will the last event be set.
-  clock.AdvanceTime(webrtc::TimeDelta::Millis(11));
+  clock.AdvanceTime(TimeDelta::Millis(11));
   EXPECT_TRUE(fourth.Wait(0));
 }