Introduce StreamInterface::FireEvent for firing stream events

This is a step towards removing StreamInterface::SignalEvent.
Downstream dependency will need to be updated to call FireEvent()
before further changes can land in webrtc.

Bug: webrtc:11943
Change-Id: Ia7d3f1c43fda52b7cf5bfa082aef3f462553cd67
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/347884
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#42143}
diff --git a/p2p/base/dtls_transport.cc b/p2p/base/dtls_transport.cc
index 42353e2..f6f6847 100644
--- a/p2p/base/dtls_transport.cc
+++ b/p2p/base/dtls_transport.cc
@@ -79,7 +79,7 @@
 rtc::StreamResult StreamInterfaceChannel::Read(rtc::ArrayView<uint8_t> buffer,
                                                size_t& read,
                                                int& error) {
-  RTC_DCHECK_RUN_ON(&sequence_checker_);
+  RTC_DCHECK_RUN_ON(&callback_sequence_);
 
   if (state_ == rtc::SS_CLOSED)
     return rtc::SR_EOS;
@@ -97,7 +97,7 @@
     rtc::ArrayView<const uint8_t> data,
     size_t& written,
     int& error) {
-  RTC_DCHECK_RUN_ON(&sequence_checker_);
+  RTC_DCHECK_RUN_ON(&callback_sequence_);
   // Always succeeds, since this is an unreliable transport anyway.
   // TODO(zhihuang): Should this block if ice_transport_'s temporarily
   // unwritable?
@@ -109,7 +109,7 @@
 }
 
 bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) {
-  RTC_DCHECK_RUN_ON(&sequence_checker_);
+  RTC_DCHECK_RUN_ON(&callback_sequence_);
   if (packets_.size() > 0) {
     RTC_LOG(LS_WARNING) << "Packet already in queue.";
   }
@@ -121,17 +121,17 @@
     // packet currently in packets_.
     RTC_LOG(LS_ERROR) << "Failed to write packet to queue.";
   }
-  SignalEvent(this, rtc::SE_READ, 0);
+  FireEvent(rtc::SE_READ, 0);
   return ret;
 }
 
 rtc::StreamState StreamInterfaceChannel::GetState() const {
-  RTC_DCHECK_RUN_ON(&sequence_checker_);
+  RTC_DCHECK_RUN_ON(&callback_sequence_);
   return state_;
 }
 
 void StreamInterfaceChannel::Close() {
-  RTC_DCHECK_RUN_ON(&sequence_checker_);
+  RTC_DCHECK_RUN_ON(&callback_sequence_);
   packets_.Clear();
   state_ = rtc::SS_CLOSED;
 }
diff --git a/p2p/base/dtls_transport.h b/p2p/base/dtls_transport.h
index f479325..109dbf5 100644
--- a/p2p/base/dtls_transport.h
+++ b/p2p/base/dtls_transport.h
@@ -58,10 +58,9 @@
                           int& error) override;
 
  private:
-  RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker sequence_checker_;
   IceTransportInternal* const ice_transport_;  // owned by DtlsTransport
-  rtc::StreamState state_ RTC_GUARDED_BY(sequence_checker_);
-  rtc::BufferQueue packets_ RTC_GUARDED_BY(sequence_checker_);
+  rtc::StreamState state_ RTC_GUARDED_BY(callback_sequence_);
+  rtc::BufferQueue packets_ RTC_GUARDED_BY(callback_sequence_);
 };
 
 // This class provides a DTLS SSLStreamAdapter inside a TransportChannel-style
@@ -235,7 +234,7 @@
   // Sets the DTLS state, signaling if necessary.
   void set_dtls_state(webrtc::DtlsTransportState state);
 
-  webrtc::SequenceChecker thread_checker_;
+  RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker thread_checker_;
 
   const int component_;
   webrtc::DtlsTransportState dtls_state_ = webrtc::DtlsTransportState::kNew;
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index c582863..69b4343 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -1446,8 +1446,11 @@
   deps = [
     ":buffer",
     ":checks",
+    ":logging",
     ":threading",
     "../api:array_view",
+    "../api:sequence_checker",
+    "system:no_unique_address",
     "system:rtc_export",
     "third_party/sigslot",
   ]
diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h
index a225c68..658a3c7 100644
--- a/rtc_base/memory/fifo_buffer.h
+++ b/rtc_base/memory/fifo_buffer.h
@@ -78,9 +78,11 @@
 
  private:
   void PostEvent(int events, int err) {
-    owner_->PostTask(webrtc::SafeTask(
-        task_safety_.flag(),
-        [this, events, err]() { SignalEvent(this, events, err); }));
+    owner_->PostTask(
+        webrtc::SafeTask(task_safety_.flag(), [this, events, err]() {
+          RTC_DCHECK_RUN_ON(&callback_sequence_);
+          FireEvent(events, err);
+        }));
   }
 
   // Helper method that implements Read. Caller must acquire a lock
diff --git a/rtc_base/openssl_stream_adapter.cc b/rtc_base/openssl_stream_adapter.cc
index e2c242b..357510b 100644
--- a/rtc_base/openssl_stream_adapter.cc
+++ b/rtc_base/openssl_stream_adapter.cc
@@ -744,6 +744,7 @@
 void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream,
                                    int events,
                                    int err) {
+  RTC_DCHECK_RUN_ON(&callback_sequence_);
   int events_to_signal = 0;
   int signal_error = 0;
   RTC_DCHECK(stream == stream_.get());
@@ -800,13 +801,14 @@
   if (events_to_signal) {
     // Note that the adapter presents itself as the origin of the stream events,
     // since users of the adapter may not recognize the adapted object.
-    SignalEvent(this, events_to_signal, signal_error);
+    FireEvent(events_to_signal, signal_error);
   }
 }
 
 void OpenSSLStreamAdapter::PostEvent(int events, int err) {
   owner_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
-    SignalEvent(this, events, err);
+    RTC_DCHECK_RUN_ON(&callback_sequence_);
+    FireEvent(events, err);
   }));
 }
 
@@ -885,8 +887,9 @@
 }
 
 int OpenSSLStreamAdapter::ContinueSSL() {
+  RTC_DCHECK_RUN_ON(&callback_sequence_);
   RTC_DLOG(LS_VERBOSE) << "ContinueSSL";
-  RTC_DCHECK(state_ == SSL_CONNECTING);
+  RTC_DCHECK_EQ(state_, SSL_CONNECTING);
 
   // Clear the DTLS timer
   timeout_task_.Stop();
@@ -911,7 +914,7 @@
         // The caller of ContinueSSL may be the same object listening for these
         // events and may not be prepared for reentrancy.
         // PostEvent(SE_OPEN | SE_READ | SE_WRITE, 0);
-        SignalEvent(this, SE_OPEN | SE_READ | SE_WRITE, 0);
+        FireEvent(SE_OPEN | SE_READ | SE_WRITE, 0);
       }
       break;
 
@@ -950,13 +953,14 @@
                                  int err,
                                  uint8_t alert,
                                  bool signal) {
+  RTC_DCHECK_RUN_ON(&callback_sequence_);
   RTC_LOG(LS_WARNING) << "OpenSSLStreamAdapter::Error(" << context << ", "
                       << err << ", " << static_cast<int>(alert) << ")";
   state_ = SSL_ERROR;
   ssl_error_code_ = err;
   Cleanup(alert);
   if (signal) {
-    SignalEvent(this, SE_CLOSE, err);
+    FireEvent(SE_CLOSE, err);
   }
 }
 
diff --git a/rtc_base/ssl_adapter_unittest.cc b/rtc_base/ssl_adapter_unittest.cc
index d0aedb8..084594f 100644
--- a/rtc_base/ssl_adapter_unittest.cc
+++ b/rtc_base/ssl_adapter_unittest.cc
@@ -216,21 +216,25 @@
 
  private:
   void OnConnectEvent(rtc::Socket* socket) {
+    RTC_DCHECK_RUN_ON(&callback_sequence_);
     RTC_DCHECK_EQ(socket, socket_.get());
-    SignalEvent(this, rtc::SE_OPEN | rtc::SE_READ | rtc::SE_WRITE, 0);
+    FireEvent(rtc::SE_OPEN | rtc::SE_READ | rtc::SE_WRITE, 0);
   }
 
   void OnReadEvent(rtc::Socket* socket) {
+    RTC_DCHECK_RUN_ON(&callback_sequence_);
     RTC_DCHECK_EQ(socket, socket_.get());
-    SignalEvent(this, rtc::SE_READ, 0);
+    FireEvent(rtc::SE_READ, 0);
   }
   void OnWriteEvent(rtc::Socket* socket) {
+    RTC_DCHECK_RUN_ON(&callback_sequence_);
     RTC_DCHECK_EQ(socket, socket_.get());
-    SignalEvent(this, rtc::SE_WRITE, 0);
+    FireEvent(rtc::SE_WRITE, 0);
   }
   void OnCloseEvent(rtc::Socket* socket, int err) {
+    RTC_DCHECK_RUN_ON(&callback_sequence_);
     RTC_DCHECK_EQ(socket, socket_.get());
-    SignalEvent(this, rtc::SE_CLOSE, err);
+    FireEvent(rtc::SE_CLOSE, err);
   }
 
   std::unique_ptr<rtc::Socket> socket_;
diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc
index 3389218..fc6532c 100644
--- a/rtc_base/ssl_stream_adapter_unittest.cc
+++ b/rtc_base/ssl_stream_adapter_unittest.cc
@@ -223,7 +223,8 @@
  private:
   void PostEvent(int events, int err) {
     thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
-      SignalEvent(this, events, err);
+      RTC_DCHECK_RUN_ON(&callback_sequence_);
+      FireEvent(events, err);
     }));
   }
 
@@ -293,7 +294,8 @@
  private:
   void PostEvent(int events, int err) {
     thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
-      SignalEvent(this, events, err);
+      RTC_DCHECK_RUN_ON(&callback_sequence_);
+      FireEvent(events, err);
     }));
   }
 
diff --git a/rtc_base/stream.h b/rtc_base/stream.h
index e02349a..4b2236a 100644
--- a/rtc_base/stream.h
+++ b/rtc_base/stream.h
@@ -14,7 +14,9 @@
 #include <memory>
 
 #include "api/array_view.h"
+#include "api/sequence_checker.h"
 #include "rtc_base/buffer.h"
+#include "rtc_base/system/no_unique_address.h"
 #include "rtc_base/system/rtc_export.h"
 #include "rtc_base/third_party/sigslot/sigslot.h"
 #include "rtc_base/thread.h"
@@ -121,6 +123,16 @@
 
  protected:
   StreamInterface();
+
+  // Utility function for derived classes.
+  void FireEvent(int stream_events, int err) RTC_RUN_ON(&callback_sequence_) {
+    // TODO(tommi): This is for backwards compatibility only while `SignalEvent`
+    // is being replaced by `SetEventHandler`.
+    SignalEvent(this, stream_events, err);
+  }
+
+  RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker callback_sequence_{
+      webrtc::SequenceChecker::kDetached};
 };
 
 }  // namespace rtc