Delete rtc::Thread functions that use rtc::MessageHandler

Bug: webrtc:9702
Change-Id: I6fc8aa8a793caf19d62a149db1861c352c609255
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/275774
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38150}
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index 7525096..fbf4105 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -76,25 +76,6 @@
 
 using ::webrtc::TimeDelta;
 
-struct AnyInvocableMessage final : public MessageData {
-  explicit AnyInvocableMessage(absl::AnyInvocable<void() &&> task)
-      : task(std::move(task)) {}
-  absl::AnyInvocable<void() &&> task;
-};
-
-class AnyInvocableMessageHandler final : public MessageHandler {
- public:
-  void OnMessage(Message* msg) override {
-    std::move(static_cast<AnyInvocableMessage*>(msg->pdata)->task)();
-    delete msg->pdata;
-  }
-};
-
-MessageHandler* GetAnyInvocableMessageHandler() {
-  static MessageHandler* const handler = new AnyInvocableMessageHandler;
-  return handler;
-}
-
 class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
  public:
   MarkProcessingCritScope(const RecursiveCriticalSection* cs,
@@ -407,7 +388,9 @@
     ss_->SetMessageQueue(nullptr);
   }
   ThreadManager::Remove(this);
-  ClearInternal(nullptr, MQID_ANY, nullptr);
+  // Clear.
+  messages_ = {};
+  delayed_messages_ = {};
 }
 
 SocketServer* Thread::socketserver() {
@@ -431,7 +414,7 @@
   stop_.store(0, std::memory_order_release);
 }
 
-bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
+absl::AnyInvocable<void() &&> Thread::Get(int cmsWait) {
   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
 
   int64_t cmsTotal = cmsWait;
@@ -448,19 +431,19 @@
       // Check for delayed messages that have been triggered and calculate the
       // next trigger time.
       while (!delayed_messages_.empty()) {
-        if (msCurrent < delayed_messages_.top().run_time_ms_) {
+        if (msCurrent < delayed_messages_.top().run_time_ms) {
           cmsDelayNext =
-              TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
+              TimeDiff(delayed_messages_.top().run_time_ms, msCurrent);
           break;
         }
-        messages_.push_back(delayed_messages_.top().msg_);
+        messages_.push(std::move(delayed_messages_.top().functor));
         delayed_messages_.pop();
       }
       // Pull a message off the message queue, if available.
       if (!messages_.empty()) {
-        *pmsg = messages_.front();
-        messages_.pop_front();
-        return true;
+        absl::AnyInvocable<void()&&> task = std::move(messages_.front());
+        messages_.pop();
+        return task;
       }
     }
 
@@ -482,8 +465,8 @@
       // Wait and multiplex in the meantime
       if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
                                          : webrtc::TimeDelta::Millis(cmsNext),
-                     process_io))
-        return false;
+                     /*process_io=*/true))
+        return nullptr;
     }
 
     // If the specified timeout expired, return
@@ -492,20 +475,14 @@
     cmsElapsed = TimeDiff(msCurrent, msStart);
     if (cmsWait != kForever) {
       if (cmsElapsed >= cmsWait)
-        return false;
+        return nullptr;
     }
   }
-  return false;
+  return nullptr;
 }
 
-void Thread::Post(const Location& posted_from,
-                  MessageHandler* phandler,
-                  uint32_t id,
-                  MessageData* pdata,
-                  bool time_sensitive) {
-  RTC_DCHECK(!time_sensitive);
+void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
   if (IsQuitting()) {
-    delete pdata;
     return;
   }
 
@@ -515,42 +492,14 @@
 
   {
     CritScope cs(&crit_);
-    Message msg;
-    msg.posted_from = posted_from;
-    msg.phandler = phandler;
-    msg.message_id = id;
-    msg.pdata = pdata;
-    messages_.push_back(msg);
+    messages_.push(std::move(task));
   }
   WakeUpSocketServer();
 }
 
-void Thread::PostDelayed(const Location& posted_from,
-                         int delay_ms,
-                         MessageHandler* phandler,
-                         uint32_t id,
-                         MessageData* pdata) {
-  return DoDelayPost(posted_from, delay_ms, TimeAfter(delay_ms), phandler, id,
-                     pdata);
-}
-
-void Thread::PostAt(const Location& posted_from,
-                    int64_t run_at_ms,
-                    MessageHandler* phandler,
-                    uint32_t id,
-                    MessageData* pdata) {
-  return DoDelayPost(posted_from, TimeUntil(run_at_ms), run_at_ms, phandler, id,
-                     pdata);
-}
-
-void Thread::DoDelayPost(const Location& posted_from,
-                         int64_t delay_ms,
-                         int64_t run_at_ms,
-                         MessageHandler* phandler,
-                         uint32_t id,
-                         MessageData* pdata) {
+void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+                                          webrtc::TimeDelta delay) {
   if (IsQuitting()) {
-    delete pdata;
     return;
   }
 
@@ -558,15 +507,14 @@
   // Add to the priority queue. Gets sorted soonest first.
   // Signal for the multiplexer to return.
 
+  int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
+  int64_t run_time_ms = TimeAfter(delay_ms);
   {
     CritScope cs(&crit_);
-    Message msg;
-    msg.posted_from = posted_from;
-    msg.phandler = phandler;
-    msg.message_id = id;
-    msg.pdata = pdata;
-    DelayedMessage delayed(delay_ms, run_at_ms, delayed_next_num_, msg);
-    delayed_messages_.push(delayed);
+    delayed_messages_.push({.delay_ms = delay_ms,
+                            .run_time_ms = run_time_ms,
+                            .message_number = delayed_next_num_,
+                            .functor = std::move(task)});
     // If this message queue processes 1 message every millisecond for 50 days,
     // we will wrap this number.  Even then, only messages with identical times
     // will be misordered, and then only briefly.  This is probably ok.
@@ -583,7 +531,7 @@
     return 0;
 
   if (!delayed_messages_.empty()) {
-    int delay = TimeUntil(delayed_messages_.top().run_time_ms_);
+    int delay = TimeUntil(delayed_messages_.top().run_time_ms);
     if (delay < 0)
       delay = 0;
     return delay;
@@ -592,56 +540,16 @@
   return kForever;
 }
 
-void Thread::ClearInternal(MessageHandler* phandler,
-                           uint32_t id,
-                           MessageList* removed) {
-  // Remove from ordered message queue
-
-  for (auto it = messages_.begin(); it != messages_.end();) {
-    if (it->Match(phandler, id)) {
-      if (removed) {
-        removed->push_back(*it);
-      } else {
-        delete it->pdata;
-      }
-      it = messages_.erase(it);
-    } else {
-      ++it;
-    }
-  }
-
-  // Remove from priority queue. Not directly iterable, so use this approach
-
-  auto new_end = delayed_messages_.container().begin();
-  for (auto it = new_end; it != delayed_messages_.container().end(); ++it) {
-    if (it->msg_.Match(phandler, id)) {
-      if (removed) {
-        removed->push_back(it->msg_);
-      } else {
-        delete it->msg_.pdata;
-      }
-    } else {
-      *new_end++ = *it;
-    }
-  }
-  delayed_messages_.container().erase(new_end,
-                                      delayed_messages_.container().end());
-  delayed_messages_.reheap();
-}
-
-void Thread::Dispatch(Message* pmsg) {
-  TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
-               pmsg->posted_from.file_name(), "src_func",
-               pmsg->posted_from.function_name());
+void Thread::Dispatch(absl::AnyInvocable<void() &&> task) {
+  TRACE_EVENT0("webrtc", "Thread::Dispatch");
   RTC_DCHECK_RUN_ON(this);
   int64_t start_time = TimeMillis();
-  pmsg->phandler->OnMessage(pmsg);
+  std::move(task)();
   int64_t end_time = TimeMillis();
   int64_t diff = TimeDiff(end_time, start_time);
   if (diff >= dispatch_warning_ms_) {
     RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff
-                     << "ms to dispatch. Posted from: "
-                     << pmsg->posted_from.ToString();
+                     << "ms to dispatch.";
     // To avoid log spew, move the warning limit to only give warning
     // for delays that are larger than the one observed.
     dispatch_warning_ms_ = diff + 1;
@@ -986,39 +894,16 @@
   delete this;
 }
 
-void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
-  // Though Post takes MessageData by raw pointer (last parameter), it still
-  // takes it with ownership.
-  Post(RTC_FROM_HERE, GetAnyInvocableMessageHandler(),
-       /*id=*/0, new AnyInvocableMessage(std::move(task)));
-}
-
 void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
                              webrtc::TimeDelta delay) {
   // This implementation does not support low precision yet.
   PostDelayedHighPrecisionTask(std::move(task), delay);
 }
 
-void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
-                                          webrtc::TimeDelta delay) {
-  int delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
-  // Though PostDelayed takes MessageData by raw pointer (last parameter),
-  // it still takes it with ownership.
-  PostDelayed(RTC_FROM_HERE, delay_ms, GetAnyInvocableMessageHandler(),
-              /*id=*/0, new AnyInvocableMessage(std::move(task)));
-}
-
 bool Thread::IsProcessingMessagesForTesting() {
   return (owned_ || IsCurrent()) && !IsQuitting();
 }
 
-void Thread::Clear(MessageHandler* phandler,
-                   uint32_t id,
-                   MessageList* removed) {
-  CritScope cs(&crit_);
-  ClearInternal(phandler, id, removed);
-}
-
 bool Thread::ProcessMessages(int cmsLoop) {
   // Using ProcessMessages with a custom clock for testing and a time greater
   // than 0 doesn't work, since it's not guaranteed to advance the custom
@@ -1032,10 +917,10 @@
 #if defined(WEBRTC_MAC)
     ScopedAutoReleasePool pool;
 #endif
-    Message msg;
-    if (!Get(&msg, cmsNext))
+    absl::AnyInvocable<void()&&> task = Get(cmsNext);
+    if (!task)
       return !IsQuitting();
-    Dispatch(&msg);
+    Dispatch(std::move(task));
 
     if (cmsLoop != kForever) {
       cmsNext = static_cast<int>(TimeUntil(msEnd));
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index 1519976..3fa7533 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -36,12 +36,10 @@
 #include "rtc_base/checks.h"
 #include "rtc_base/deprecated/recursive_critical_section.h"
 #include "rtc_base/location.h"
-#include "rtc_base/message_handler.h"
 #include "rtc_base/platform_thread_types.h"
 #include "rtc_base/socket_server.h"
 #include "rtc_base/system/rtc_export.h"
 #include "rtc_base/thread_annotations.h"
-#include "rtc_base/thread_message.h"
 
 #if defined(WEBRTC_WIN)
 #include "rtc_base/win32.h"
@@ -267,26 +265,6 @@
   // Processed.  Normally, this would be true until IsQuitting() is true.
   virtual bool IsProcessingMessagesForTesting();
 
-  // `time_sensitive` is deprecated and should always be false.
-  virtual void Post(const Location& posted_from,
-                    MessageHandler* phandler,
-                    uint32_t id = 0,
-                    MessageData* pdata = nullptr,
-                    bool time_sensitive = false);
-  virtual void PostDelayed(const Location& posted_from,
-                           int delay_ms,
-                           MessageHandler* phandler,
-                           uint32_t id = 0,
-                           MessageData* pdata = nullptr);
-  virtual void PostAt(const Location& posted_from,
-                      int64_t run_at_ms,
-                      MessageHandler* phandler,
-                      uint32_t id = 0,
-                      MessageData* pdata = nullptr);
-  virtual void Clear(MessageHandler* phandler,
-                     uint32_t id = MQID_ANY,
-                     MessageList* removed = nullptr);
-
   // Amount of time until the next message can be retrieved
   virtual int GetDelay();
 
@@ -427,54 +405,28 @@
 
   // DelayedMessage goes into a priority queue, sorted by trigger time. Messages
   // with the same trigger time are processed in num_ (FIFO) order.
-  class DelayedMessage {
-   public:
-    DelayedMessage(int64_t delay,
-                   int64_t run_time_ms,
-                   uint32_t num,
-                   const Message& msg)
-        : delay_ms_(delay),
-          run_time_ms_(run_time_ms),
-          message_number_(num),
-          msg_(msg) {}
-
+  struct DelayedMessage {
     bool operator<(const DelayedMessage& dmsg) const {
-      return (dmsg.run_time_ms_ < run_time_ms_) ||
-             ((dmsg.run_time_ms_ == run_time_ms_) &&
-              (dmsg.message_number_ < message_number_));
+      return (dmsg.run_time_ms < run_time_ms) ||
+             ((dmsg.run_time_ms == run_time_ms) &&
+              (dmsg.message_number < message_number));
     }
 
-    int64_t delay_ms_;  // for debugging
-    int64_t run_time_ms_;
+    int64_t delay_ms;  // for debugging
+    int64_t run_time_ms;
     // Monotonicaly incrementing number used for ordering of messages
     // targeted to execute at the same time.
-    uint32_t message_number_;
-    Message msg_;
+    uint32_t message_number;
+    // std::priority_queue doesn't allow to extract elements, but functor
+    // is move-only and thus need to be changed when pulled out of the
+    // priority queue. That is ok because `functor` doesn't affect operator<
+    mutable absl::AnyInvocable<void() &&> functor;
   };
 
-  class PriorityQueue : public std::priority_queue<DelayedMessage> {
-   public:
-    container_type& container() { return c; }
-    void reheap() { make_heap(c.begin(), c.end(), comp); }
-  };
-
-  void DoDelayPost(const Location& posted_from,
-                   int64_t cmsDelay,
-                   int64_t tstamp,
-                   MessageHandler* phandler,
-                   uint32_t id,
-                   MessageData* pdata);
-
   // Perform initialization, subclasses must call this from their constructor
   // if false was passed as init_queue to the Thread constructor.
   void DoInit();
 
-  // Does not take any lock. Must be called either while holding crit_, or by
-  // the destructor (by definition, the latter has exclusive access).
-  void ClearInternal(MessageHandler* phandler,
-                     uint32_t id,
-                     MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
-
   // Perform cleanup; subclasses must call this from the destructor,
   // and are not expected to actually hold the lock.
   void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_);
@@ -497,13 +449,11 @@
   static const int kSlowDispatchLoggingThreshold = 50;  // 50 ms
 
   // Get() will process I/O until:
-  //  1) A message is available (returns true)
-  //  2) cmsWait seconds have elapsed (returns false)
-  //  3) Stop() is called (returns false)
-  virtual bool Get(Message* pmsg,
-                   int cmsWait = kForever,
-                   bool process_io = true);
-  virtual void Dispatch(Message* pmsg);
+  //  1) A task is available (returns it)
+  //  2) cmsWait seconds have elapsed (returns empty task)
+  //  3) Stop() is called (returns empty task)
+  absl::AnyInvocable<void() &&> Get(int cmsWait);
+  void Dispatch(absl::AnyInvocable<void() &&> task);
 
   // Sets the per-thread allow-blocking-calls flag and returns the previous
   // value. Must be called on this thread.
@@ -532,8 +482,8 @@
   // Called by the ThreadManager when being unset as the current thread.
   void ClearCurrentTaskQueue();
 
-  MessageList messages_ RTC_GUARDED_BY(crit_);
-  PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_);
+  std::queue<absl::AnyInvocable<void() &&>> messages_ RTC_GUARDED_BY(crit_);
+  std::priority_queue<DelayedMessage> delayed_messages_ RTC_GUARDED_BY(crit_);
   uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_);
 #if RTC_DCHECK_IS_ON
   uint32_t blocking_call_count_ RTC_GUARDED_BY(this) = 0;
diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc
index c0e93bc..d5b467c 100644
--- a/rtc_base/thread_unittest.cc
+++ b/rtc_base/thread_unittest.cc
@@ -553,32 +553,6 @@
   ThreadManager::ProcessAllMessageQueuesForTesting();
 }
 
-// Test that ProcessAllMessageQueues doesn't hang if a queue clears its
-// messages.
-TEST(ThreadManager, ProcessAllMessageQueuesWithClearedQueue) {
-  rtc::AutoThread main_thread;
-  Event entered_process_all_message_queues(true, false);
-  auto t = Thread::CreateWithSocketServer();
-  t->Start();
-
-  auto clearer = [&entered_process_all_message_queues] {
-    // Wait for event as a means to ensure Clear doesn't occur outside of
-    // ProcessAllMessageQueues. The event is set by a message posted to the
-    // main thread, which is guaranteed to be handled inside
-    // ProcessAllMessageQueues.
-    entered_process_all_message_queues.Wait(Event::kForever);
-    rtc::Thread::Current()->Clear(nullptr);
-  };
-  auto event_signaler = [&entered_process_all_message_queues] {
-    entered_process_all_message_queues.Set();
-  };
-
-  // Post messages (both delayed and non delayed) to both threads.
-  t->PostTask(clearer);
-  main_thread.PostTask(event_signaler);
-  ThreadManager::ProcessAllMessageQueuesForTesting();
-}
-
 void WaitAndSetEvent(Event* wait_event, Event* set_event) {
   wait_event->Wait(Event::kForever);
   set_event->Set();
diff --git a/test/time_controller/simulated_thread.cc b/test/time_controller/simulated_thread.cc
index 266b76f..bdd1096 100644
--- a/test/time_controller/simulated_thread.cc
+++ b/test/time_controller/simulated_thread.cc
@@ -77,35 +77,27 @@
   }
 }
 
-void SimulatedThread::Post(const rtc::Location& posted_from,
-                           rtc::MessageHandler* phandler,
-                           uint32_t id,
-                           rtc::MessageData* pdata,
-                           bool time_sensitive) {
-  rtc::Thread::Post(posted_from, phandler, id, pdata, time_sensitive);
+void SimulatedThread::PostTask(absl::AnyInvocable<void() &&> task) {
+  rtc::Thread::PostTask(std::move(task));
   MutexLock lock(&lock_);
   next_run_time_ = Timestamp::MinusInfinity();
 }
 
-void SimulatedThread::PostDelayed(const rtc::Location& posted_from,
-                                  int delay_ms,
-                                  rtc::MessageHandler* phandler,
-                                  uint32_t id,
-                                  rtc::MessageData* pdata) {
-  rtc::Thread::PostDelayed(posted_from, delay_ms, phandler, id, pdata);
+void SimulatedThread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
+                                      TimeDelta delay) {
+  rtc::Thread::PostDelayedTask(std::move(task), delay);
   MutexLock lock(&lock_);
   next_run_time_ =
-      std::min(next_run_time_, Timestamp::Millis(rtc::TimeMillis() + delay_ms));
+      std::min(next_run_time_, Timestamp::Millis(rtc::TimeMillis()) + delay);
 }
 
-void SimulatedThread::PostAt(const rtc::Location& posted_from,
-                             int64_t target_time_ms,
-                             rtc::MessageHandler* phandler,
-                             uint32_t id,
-                             rtc::MessageData* pdata) {
-  rtc::Thread::PostAt(posted_from, target_time_ms, phandler, id, pdata);
+void SimulatedThread::PostDelayedHighPrecisionTask(
+    absl::AnyInvocable<void() &&> task,
+    TimeDelta delay) {
+  rtc::Thread::PostDelayedHighPrecisionTask(std::move(task), delay);
   MutexLock lock(&lock_);
-  next_run_time_ = std::min(next_run_time_, Timestamp::Millis(target_time_ms));
+  next_run_time_ =
+      std::min(next_run_time_, Timestamp::Millis(rtc::TimeMillis()) + delay);
 }
 
 void SimulatedThread::Stop() {
diff --git a/test/time_controller/simulated_thread.h b/test/time_controller/simulated_thread.h
index 9272e28..e8e08c5 100644
--- a/test/time_controller/simulated_thread.h
+++ b/test/time_controller/simulated_thread.h
@@ -37,21 +37,11 @@
 
   // Thread interface
   void BlockingCall(rtc::FunctionView<void()> functor) override;
-  void Post(const rtc::Location& posted_from,
-            rtc::MessageHandler* phandler,
-            uint32_t id,
-            rtc::MessageData* pdata,
-            bool time_sensitive) override;
-  void PostDelayed(const rtc::Location& posted_from,
-                   int delay_ms,
-                   rtc::MessageHandler* phandler,
-                   uint32_t id,
-                   rtc::MessageData* pdata) override;
-  void PostAt(const rtc::Location& posted_from,
-              int64_t target_time_ms,
-              rtc::MessageHandler* phandler,
-              uint32_t id,
-              rtc::MessageData* pdata) override;
+  void PostTask(absl::AnyInvocable<void() &&> task) override;
+  void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+                       TimeDelta delay) override;
+  void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+                                    TimeDelta delay) override;
 
   void Stop() override;