[Sheriff] Revert "Remove MessageHandler[AutoCleanup] dependency from StreamInterface."

This reverts commit eb79dd9ffdc41e4ca86803bfc1317e0961a8a8a6.

Reason for revert: breaks WebRTC roll into Chrome:
https://crrev.com/c/2445696

Sample failure:
https://ci.chromium.org/p/chromium/builders/try/linux-rel/506049
[ RUN      ] PseudoTcpAdapterTest.DeleteOnConnected

Original change's description:
> Remove MessageHandler[AutoCleanup] dependency from StreamInterface.
>
> This includes relying on related types such as MessageData and
> PostEvent functionality inside the StreamInterface itself.
>
> This affects mostly tests but OpenSSLStreamAdapter
> requires special attention.
>
> Bug: webrtc:11988
> Change-Id: Ib5c895f1bdf77bb49e3162bd49718f8a98812d91
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/185505
> Commit-Queue: Tommi <tommi@webrtc.org>
> Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#32290}

TBR=kwiberg@webrtc.org,tommi@webrtc.org

Change-Id: I23d7a311a73c739eba872a21e6123235465c28cc
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: webrtc:11988
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/186564
Commit-Queue: Marina Ciocea <marinaciocea@webrtc.org>
Reviewed-by: Marina Ciocea <marinaciocea@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32299}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 49008a2..77bff8d 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -835,7 +835,6 @@
     "system:no_unique_address",
     "system:rtc_export",
     "task_utils:pending_task_safety_flag",
-    "task_utils:repeating_task",
     "task_utils:to_queued_task",
     "third_party/base64",
     "third_party/sigslot",
@@ -1426,7 +1425,6 @@
       "memory:fifo_buffer",
       "synchronization:mutex",
       "synchronization:synchronization_unittests",
-      "task_utils:pending_task_safety_flag",
       "task_utils:to_queued_task",
       "third_party/sigslot",
     ]
diff --git a/rtc_base/memory/BUILD.gn b/rtc_base/memory/BUILD.gn
index 838fbc6..5c3dd0a 100644
--- a/rtc_base/memory/BUILD.gn
+++ b/rtc_base/memory/BUILD.gn
@@ -20,15 +20,12 @@
   deps = [ "..:checks" ]
 }
 
-# Test only utility.
-# TODO: Tag with `testonly = true` once all depending targets are correctly
-# tagged.
 rtc_library("fifo_buffer") {
   visibility = [
-    ":unittests",
+    "../../p2p:rtc_p2p",
     "..:rtc_base_tests_utils",
     "..:rtc_base_unittests",
-    "../../p2p:rtc_p2p",  # This needs to be fixed.
+    ":unittests",
   ]
   sources = [
     "fifo_buffer.cc",
@@ -37,8 +34,6 @@
   deps = [
     "..:rtc_base",
     "../synchronization:mutex",
-    "../task_utils:pending_task_safety_flag",
-    "../task_utils:to_queued_task",
   ]
 }
 
diff --git a/rtc_base/memory/fifo_buffer.cc b/rtc_base/memory/fifo_buffer.cc
index 3fbea8d..49e9267 100644
--- a/rtc_base/memory/fifo_buffer.cc
+++ b/rtc_base/memory/fifo_buffer.cc
@@ -104,7 +104,7 @@
 
     // if we were full before, and now we're not, post an event
     if (!was_writable && copy > 0) {
-      PostEvent(SE_WRITE, 0);
+      PostEvent(owner_, SE_WRITE, 0);
     }
   }
   return result;
@@ -129,7 +129,7 @@
 
     // if we didn't have any data to read before, and now we do, post an event
     if (!was_readable && copy > 0) {
-      PostEvent(SE_READ, 0);
+      PostEvent(owner_, SE_READ, 0);
     }
   }
   return result;
@@ -155,7 +155,7 @@
   read_position_ = (read_position_ + size) % buffer_length_;
   data_length_ -= size;
   if (!was_writable && size > 0) {
-    PostEvent(SE_WRITE, 0);
+    PostEvent(owner_, SE_WRITE, 0);
   }
 }
 
@@ -185,7 +185,7 @@
   const bool was_readable = (data_length_ > 0);
   data_length_ += size;
   if (!was_readable && size > 0) {
-    PostEvent(SE_READ, 0);
+    PostEvent(owner_, SE_READ, 0);
   }
 }
 
diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h
index bf2edf6..04c4cbf 100644
--- a/rtc_base/memory/fifo_buffer.h
+++ b/rtc_base/memory/fifo_buffer.h
@@ -15,8 +15,6 @@
 
 #include "rtc_base/stream.h"
 #include "rtc_base/synchronization/mutex.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
-#include "rtc_base/task_utils/to_queued_task.h"
 
 namespace rtc {
 
@@ -100,12 +98,6 @@
   bool GetWriteRemaining(size_t* size) const;
 
  private:
-  void PostEvent(int events, int err) {
-    owner_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
-      SignalEvent(this, events, err);
-    }));
-  }
-
   // Helper method that implements ReadOffset. Caller must acquire a lock
   // when calling this method.
   StreamResult ReadOffsetLocked(void* buffer,
@@ -122,8 +114,6 @@
                                  size_t* bytes_written)
       RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
 
-  webrtc::ScopedTaskSafety task_safety_;
-
   // keeps the opened/closed state of the stream
   StreamState state_ RTC_GUARDED_BY(mutex_);
   // the allocated buffer
@@ -135,7 +125,7 @@
   // offset to the readable data
   size_t read_position_ RTC_GUARDED_BY(mutex_);
   // stream callbacks are dispatched on this thread
-  Thread* const owner_;
+  Thread* owner_;
   // object lock
   mutable webrtc::Mutex mutex_;
   RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer);
diff --git a/rtc_base/openssl_stream_adapter.cc b/rtc_base/openssl_stream_adapter.cc
index 5790b1b..175160c 100644
--- a/rtc_base/openssl_stream_adapter.cc
+++ b/rtc_base/openssl_stream_adapter.cc
@@ -35,7 +35,6 @@
 #include "rtc_base/openssl_identity.h"
 #include "rtc_base/ssl_certificate.h"
 #include "rtc_base/stream.h"
-#include "rtc_base/task_utils/to_queued_task.h"
 #include "rtc_base/thread.h"
 #include "rtc_base/time_utils.h"
 #include "system_wrappers/include/field_trial.h"
@@ -284,7 +283,6 @@
 OpenSSLStreamAdapter::OpenSSLStreamAdapter(
     std::unique_ptr<StreamInterface> stream)
     : SSLStreamAdapter(std::move(stream)),
-      owner_(rtc::Thread::Current()),
       state_(SSL_NONE),
       role_(SSL_CLIENT),
       ssl_read_needs_write_(false),
@@ -298,7 +296,6 @@
       support_legacy_tls_protocols_flag_(ShouldAllowLegacyTLSProtocols()) {}
 
 OpenSSLStreamAdapter::~OpenSSLStreamAdapter() {
-  timeout_task_.Stop();
   Cleanup(0);
 }
 
@@ -804,33 +801,6 @@
   }
 }
 
-void OpenSSLStreamAdapter::PostEvent(int events, int err) {
-  owner_->PostTask(webrtc::ToQueuedTask(
-      task_safety_, [this, events, err]() { SignalEvent(this, events, err); }));
-}
-
-void OpenSSLStreamAdapter::SetTimeout(int delay_ms) {
-  // We need to accept 0 delay here as well as >0 delay, because
-  // DTLSv1_get_timeout seems to frequently return 0 ms.
-  RTC_DCHECK_GE(delay_ms, 0);
-  RTC_DCHECK(!timeout_task_.Running());
-
-  timeout_task_ = webrtc::RepeatingTaskHandle::DelayedStart(
-      owner_, webrtc::TimeDelta::Millis(delay_ms),
-      [flag = task_safety_.flag(), this]() {
-        if (flag->alive()) {
-          RTC_DLOG(LS_INFO) << "DTLS timeout expired";
-          timeout_task_.Stop();
-          DTLSv1_handle_timeout(ssl_);
-          ContinueSSL();
-        } else {
-          RTC_NOTREACHED();
-        }
-        // This callback will never run again (stopped above).
-        return webrtc::TimeDelta::PlusInfinity();
-      });
-}
-
 int OpenSSLStreamAdapter::BeginSSL() {
   RTC_DCHECK(state_ == SSL_CONNECTING);
   // The underlying stream has opened.
@@ -881,7 +851,7 @@
   RTC_DCHECK(state_ == SSL_CONNECTING);
 
   // Clear the DTLS timer
-  timeout_task_.Stop();
+  Thread::Current()->Clear(this, MSG_TIMEOUT);
 
   const int code = (role_ == SSL_CLIENT) ? SSL_connect(ssl_) : SSL_accept(ssl_);
   const int ssl_error = SSL_get_error(ssl_, code);
@@ -913,7 +883,9 @@
       struct timeval timeout;
       if (DTLSv1_get_timeout(ssl_, &timeout)) {
         int delay = timeout.tv_sec * 1000 + timeout.tv_usec / 1000;
-        SetTimeout(delay);
+
+        Thread::Current()->PostDelayed(RTC_FROM_HERE, delay, this, MSG_TIMEOUT,
+                                       0);
       }
     } break;
 
@@ -990,7 +962,18 @@
   peer_cert_chain_.reset();
 
   // Clear the DTLS timer
-  timeout_task_.Stop();
+  Thread::Current()->Clear(this, MSG_TIMEOUT);
+}
+
+void OpenSSLStreamAdapter::OnMessage(Message* msg) {
+  // Process our own messages and then pass others to the superclass
+  if (MSG_TIMEOUT == msg->message_id) {
+    RTC_DLOG(LS_INFO) << "DTLS timeout expired";
+    DTLSv1_handle_timeout(ssl_);
+    ContinueSSL();
+  } else {
+    StreamInterface::OnMessage(msg);
+  }
 }
 
 SSL_CTX* OpenSSLStreamAdapter::SetupSSLContext() {
diff --git a/rtc_base/openssl_stream_adapter.h b/rtc_base/openssl_stream_adapter.h
index fbfccd684..d4cde84 100644
--- a/rtc_base/openssl_stream_adapter.h
+++ b/rtc_base/openssl_stream_adapter.h
@@ -26,8 +26,6 @@
 #include "rtc_base/ssl_stream_adapter.h"
 #include "rtc_base/stream.h"
 #include "rtc_base/system/rtc_export.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
-#include "rtc_base/task_utils/repeating_task.h"
 
 namespace rtc {
 
@@ -147,8 +145,7 @@
     SSL_CLOSED       // Clean close
   };
 
-  void PostEvent(int events, int err);
-  void SetTimeout(int delay_ms);
+  enum { MSG_TIMEOUT = MSG_MAX + 1 };
 
   // The following three methods return 0 on success and a negative
   // error code on failure. The error code may be from OpenSSL or -1
@@ -172,6 +169,9 @@
   void Error(const char* context, int err, uint8_t alert, bool signal);
   void Cleanup(uint8_t alert);
 
+  // Override MessageHandler
+  void OnMessage(Message* msg) override;
+
   // Flush the input buffers by reading left bytes (for DTLS)
   void FlushInput(unsigned int left);
 
@@ -192,10 +192,6 @@
            !peer_certificate_digest_value_.empty();
   }
 
-  rtc::Thread* const owner_;
-  webrtc::ScopedTaskSafety task_safety_;
-  webrtc::RepeatingTaskHandle timeout_task_;
-
   SSLState state_;
   SSLRole role_;
   int ssl_error_code_;  // valid when state_ == SSL_ERROR or SSL_CLOSED
diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc
index 1ba2f3e..bfbaf0f 100644
--- a/rtc_base/ssl_stream_adapter_unittest.cc
+++ b/rtc_base/ssl_stream_adapter_unittest.cc
@@ -26,8 +26,6 @@
 #include "rtc_base/ssl_identity.h"
 #include "rtc_base/ssl_stream_adapter.h"
 #include "rtc_base/stream.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
-#include "rtc_base/task_utils/to_queued_task.h"
 #include "test/field_trial.h"
 
 using ::testing::Combine;
@@ -216,15 +214,7 @@
     out_->Close();
   }
 
- private:
-  void PostEvent(int events, int err) {
-    thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
-      SignalEvent(this, events, err);
-    }));
-  }
-
-  webrtc::ScopedTaskSafety task_safety_;
-  rtc::Thread* const thread_ = rtc::Thread::Current();
+ protected:
   SSLStreamAdapterTestBase* test_base_;
   const std::string side_;
   rtc::StreamInterface* in_;
@@ -286,17 +276,10 @@
 
  protected:
   void NotifyReadableForTest() { PostEvent(rtc::SE_READ, 0); }
+
   void NotifyWritableForTest() { PostEvent(rtc::SE_WRITE, 0); }
 
  private:
-  void PostEvent(int events, int err) {
-    thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
-      SignalEvent(this, events, err);
-    }));
-  }
-
-  rtc::Thread* const thread_ = rtc::Thread::Current();
-  webrtc::ScopedTaskSafety task_safety_;
   rtc::BufferQueue buffer_;
 };
 
diff --git a/rtc_base/stream.cc b/rtc_base/stream.cc
index ee72f8d..1b0a4d7 100644
--- a/rtc_base/stream.cc
+++ b/rtc_base/stream.cc
@@ -24,6 +24,7 @@
 ///////////////////////////////////////////////////////////////////////////////
 // StreamInterface
 ///////////////////////////////////////////////////////////////////////////////
+StreamInterface::~StreamInterface() {}
 
 StreamResult StreamInterface::WriteAll(const void* data,
                                        size_t data_len,
@@ -43,12 +44,29 @@
   return result;
 }
 
+void StreamInterface::PostEvent(Thread* t, int events, int err) {
+  t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
+          new StreamEventData(events, err));
+}
+
+void StreamInterface::PostEvent(int events, int err) {
+  PostEvent(Thread::Current(), events, err);
+}
+
 bool StreamInterface::Flush() {
   return false;
 }
 
 StreamInterface::StreamInterface() {}
 
+void StreamInterface::OnMessage(Message* msg) {
+  if (MSG_POST_EVENT == msg->message_id) {
+    StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
+    SignalEvent(this, pe->events, pe->error);
+    delete msg->pdata;
+  }
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 // StreamAdapterInterface
 ///////////////////////////////////////////////////////////////////////////////
diff --git a/rtc_base/stream.h b/rtc_base/stream.h
index 9bf11a2..940bfb4 100644
--- a/rtc_base/stream.h
+++ b/rtc_base/stream.h
@@ -48,9 +48,16 @@
 //  SE_WRITE: Data can be written, so Write is likely to not return SR_BLOCK
 enum StreamEvent { SE_OPEN = 1, SE_READ = 2, SE_WRITE = 4, SE_CLOSE = 8 };
 
-class RTC_EXPORT StreamInterface {
+struct StreamEventData : public MessageData {
+  int events, error;
+  StreamEventData(int ev, int er) : events(ev), error(er) {}
+};
+
+class RTC_EXPORT StreamInterface : public MessageHandlerAutoCleanup {
  public:
-  virtual ~StreamInterface() {}
+  enum { MSG_POST_EVENT = 0xF1F1, MSG_MAX = MSG_POST_EVENT };
+
+  ~StreamInterface() override;
 
   virtual StreamState GetState() const = 0;
 
@@ -89,6 +96,13 @@
   // certain events will be raised in the future.
   sigslot::signal3<StreamInterface*, int, int> SignalEvent;
 
+  // Like calling SignalEvent, but posts a message to the specified thread,
+  // which will call SignalEvent.  This helps unroll the stack and prevent
+  // re-entrancy.
+  void PostEvent(Thread* t, int events, int err);
+  // Like the aforementioned method, but posts to the current thread.
+  void PostEvent(int events, int err);
+
   // Return true if flush is successful.
   virtual bool Flush();
 
@@ -111,6 +125,9 @@
  protected:
   StreamInterface();
 
+  // MessageHandler Interface
+  void OnMessage(Message* msg) override;
+
  private:
   RTC_DISALLOW_COPY_AND_ASSIGN(StreamInterface);
 };