Deprecate the StreamInterface::SignalEvent sigslot

In its stead, there's now a SetEventCallback() method.

Bug: webrtc:11943
Change-Id: If936d6e1e23e8a584f06feb123ecf2d450ea4145
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/319040
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#42187}
diff --git a/p2p/base/dtls_transport.cc b/p2p/base/dtls_transport.cc
index f6f6847..056b5a5 100644
--- a/p2p/base/dtls_transport.cc
+++ b/p2p/base/dtls_transport.cc
@@ -379,7 +379,8 @@
   dtls_->SetMode(rtc::SSL_MODE_DTLS);
   dtls_->SetMaxProtocolVersion(ssl_max_version_);
   dtls_->SetServerRole(*dtls_role_);
-  dtls_->SignalEvent.connect(this, &DtlsTransport::OnDtlsEvent);
+  dtls_->SetEventCallback(
+      [this](int events, int err) { OnDtlsEvent(events, err); });
   if (remote_fingerprint_value_.size() &&
       !dtls_->SetPeerCertificateDigest(
           remote_fingerprint_algorithm_,
@@ -698,9 +699,8 @@
   }
 }
 
-void DtlsTransport::OnDtlsEvent(rtc::StreamInterface* dtls, int sig, int err) {
+void DtlsTransport::OnDtlsEvent(int sig, int err) {
   RTC_DCHECK_RUN_ON(&thread_checker_);
-  RTC_DCHECK(dtls == dtls_.get());
   if (sig & rtc::SE_OPEN) {
     // This is the first time.
     RTC_LOG(LS_INFO) << ToString() << ": DTLS handshake complete.";
diff --git a/p2p/base/dtls_transport.h b/p2p/base/dtls_transport.h
index 109dbf5..ceeaa84 100644
--- a/p2p/base/dtls_transport.h
+++ b/p2p/base/dtls_transport.h
@@ -221,7 +221,7 @@
                     const rtc::SentPacket& sent_packet);
   void OnReadyToSend(rtc::PacketTransportInternal* transport);
   void OnReceivingState(rtc::PacketTransportInternal* transport);
-  void OnDtlsEvent(rtc::StreamInterface* stream_, int sig, int err);
+  void OnDtlsEvent(int sig, int err);
   void OnNetworkRouteChanged(absl::optional<rtc::NetworkRoute> network_route);
   bool SetupDtls();
   void MaybeStartDtls();
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index a7a2ada..f761c36 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -1464,6 +1464,8 @@
     "system:rtc_export",
     "third_party/sigslot",
   ]
+
+  absl_deps = [ "//third_party/abseil-cpp/absl/functional:any_invocable" ]
 }
 
 rtc_library("rtc_certificate_generator") {
@@ -2131,6 +2133,7 @@
           "ssl_identity_unittest.cc",
           "ssl_stream_adapter_unittest.cc",
         ]
+        deps += [ ":callback_list" ]
       }
       absl_deps = [
         "//third_party/abseil-cpp/absl/algorithm:container",
diff --git a/rtc_base/openssl_stream_adapter.cc b/rtc_base/openssl_stream_adapter.cc
index 357510b..28da5d4 100644
--- a/rtc_base/openssl_stream_adapter.cc
+++ b/rtc_base/openssl_stream_adapter.cc
@@ -296,7 +296,8 @@
 #endif
       ssl_mode_(SSL_MODE_TLS),
       ssl_max_version_(SSL_PROTOCOL_TLS_12) {
-  stream_->SignalEvent.connect(this, &OpenSSLStreamAdapter::OnEvent);
+  stream_->SetEventCallback(
+      [this](int events, int err) { OnEvent(events, err); });
 }
 
 OpenSSLStreamAdapter::~OpenSSLStreamAdapter() {
@@ -741,13 +742,10 @@
   // not reached
 }
 
-void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream,
-                                   int events,
-                                   int err) {
+void OpenSSLStreamAdapter::OnEvent(int events, int err) {
   RTC_DCHECK_RUN_ON(&callback_sequence_);
   int events_to_signal = 0;
   int signal_error = 0;
-  RTC_DCHECK(stream == stream_.get());
 
   if ((events & SE_OPEN)) {
     RTC_DLOG(LS_VERBOSE) << "OpenSSLStreamAdapter::OnEvent SE_OPEN";
diff --git a/rtc_base/openssl_stream_adapter.h b/rtc_base/openssl_stream_adapter.h
index 3ef1363..c3558b3 100644
--- a/rtc_base/openssl_stream_adapter.h
+++ b/rtc_base/openssl_stream_adapter.h
@@ -148,7 +148,7 @@
     SSL_CLOSED       // Clean close
   };
 
-  void OnEvent(StreamInterface* stream, int events, int err);
+  void OnEvent(int events, int err);
 
   void PostEvent(int events, int err);
   void SetTimeout(int delay_ms);
diff --git a/rtc_base/ssl_adapter_unittest.cc b/rtc_base/ssl_adapter_unittest.cc
index 084594f..ec407c5 100644
--- a/rtc_base/ssl_adapter_unittest.cc
+++ b/rtc_base/ssl_adapter_unittest.cc
@@ -321,7 +321,7 @@
     DoHandshake(server_socket_->Accept(nullptr));
   }
 
-  void OnSSLStreamAdapterEvent(rtc::StreamInterface* stream, int sig, int err) {
+  void OnSSLStreamAdapterEvent(int sig, int err) {
     if (sig & rtc::SE_READ) {
       uint8_t buffer[4096] = "";
       size_t read;
@@ -329,7 +329,7 @@
 
       // Read data received from the client and store it in our internal
       // buffer.
-      rtc::StreamResult r = stream->Read(buffer, read, error);
+      rtc::StreamResult r = ssl_stream_adapter_->Read(buffer, read, error);
       if (r == rtc::SR_SUCCESS) {
         buffer[read] = '\0';
         // Here we assume that the buffer is interpretable as string.
@@ -365,8 +365,8 @@
 
     ssl_stream_adapter_->StartSSL();
 
-    ssl_stream_adapter_->SignalEvent.connect(
-        this, &SSLAdapterTestDummyServer::OnSSLStreamAdapterEvent);
+    ssl_stream_adapter_->SetEventCallback(
+        [this](int events, int err) { OnSSLStreamAdapterEvent(events, err); });
   }
 
   const rtc::SSLMode ssl_mode_;
diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc
index fc6532c..6d76c7b 100644
--- a/rtc_base/ssl_stream_adapter_unittest.cc
+++ b/rtc_base/ssl_stream_adapter_unittest.cc
@@ -20,6 +20,7 @@
 #include "api/array_view.h"
 #include "api/task_queue/pending_task_safety_flag.h"
 #include "rtc_base/buffer_queue.h"
+#include "rtc_base/callback_list.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/gunit.h"
 #include "rtc_base/helpers.h"
@@ -149,17 +150,75 @@
 
 class SSLStreamAdapterTestBase;
 
-class SSLDummyStreamBase : public rtc::StreamInterface,
-                           public sigslot::has_slots<> {
+// StreamWrapper is a middle layer between `stream`, which supports a single
+// event callback, and test classes in this file that need that event forwarded
+// to them. I.e. this class wraps a `stream` object that it delegates all calls
+// to, but for the event callback, `StreamWrapper` additionally provides support
+// for forwarding event notifications to test classes that call
+// `SubscribeStreamEvent()`.
+//
+// This is needed because in this file, tests connect both client and server
+// streams (SSLDummyStream) to the same underlying `stream` objects
+// (see CreateClientStream() and CreateServerStream()).
+class StreamWrapper : public rtc::StreamInterface {
  public:
-  SSLDummyStreamBase(SSLStreamAdapterTestBase* test,
-                     absl::string_view side,
-                     rtc::StreamInterface* in,
-                     rtc::StreamInterface* out)
+  explicit StreamWrapper(std::unique_ptr<rtc::StreamInterface> stream)
+      : stream_(std::move(stream)) {
+    stream_->SetEventCallback([this](int events, int err) {
+      RTC_DCHECK_RUN_ON(&callback_sequence_);
+      callbacks_.Send(events, err);
+      FireEvent(events, err);
+    });
+  }
+
+  template <typename F>
+  void SubscribeStreamEvent(const void* removal_tag, F&& callback) {
+    callbacks_.AddReceiver(removal_tag, std::forward<F>(callback));
+  }
+
+  void UnsubscribeStreamEvent(const void* removal_tag) {
+    callbacks_.RemoveReceivers(removal_tag);
+  }
+
+  rtc::StreamState GetState() const override { return stream_->GetState(); }
+
+  void Close() override { stream_->Close(); }
+
+  rtc::StreamResult Read(rtc::ArrayView<uint8_t> buffer,
+                         size_t& read,
+                         int& error) override {
+    return stream_->Read(buffer, read, error);
+  }
+
+  rtc::StreamResult Write(rtc::ArrayView<const uint8_t> data,
+                          size_t& written,
+                          int& error) override {
+    return stream_->Write(data, written, error);
+  }
+
+ private:
+  const std::unique_ptr<rtc::StreamInterface> stream_;
+  webrtc::CallbackList<int, int> callbacks_;
+};
+
+class SSLDummyStream final : public rtc::StreamInterface {
+ public:
+  SSLDummyStream(SSLStreamAdapterTestBase* test,
+                 absl::string_view side,
+                 StreamWrapper* in,
+                 StreamWrapper* out)
       : test_base_(test), side_(side), in_(in), out_(out), first_packet_(true) {
-    RTC_DCHECK_NE(in, out);
-    in_->SignalEvent.connect(this, &SSLDummyStreamBase::OnEventIn);
-    out_->SignalEvent.connect(this, &SSLDummyStreamBase::OnEventOut);
+    RTC_CHECK(thread_);
+    RTC_CHECK_NE(in, out);
+    in_->SubscribeStreamEvent(
+        this, [this](int events, int err) { OnEventIn(events, err); });
+    out_->SubscribeStreamEvent(
+        this, [this](int events, int err) { OnEventOut(events, err); });
+  }
+
+  ~SSLDummyStream() override {
+    in_->UnsubscribeStreamEvent(this);
+    out_->UnsubscribeStreamEvent(this);
   }
 
   rtc::StreamState GetState() const override { return rtc::SS_OPEN; }
@@ -184,20 +243,20 @@
   }
 
   // Catch readability events on in and pass them up.
-  void OnEventIn(rtc::StreamInterface* stream, int sig, int err) {
+  void OnEventIn(int sig, int err) {
     int mask = (rtc::SE_READ | rtc::SE_CLOSE);
 
     if (sig & mask) {
-      RTC_LOG(LS_VERBOSE) << "SSLDummyStreamBase::OnEventIn side=" << side_
+      RTC_LOG(LS_VERBOSE) << "SSLDummyStream::OnEventIn side=" << side_
                           << " sig=" << sig << " forwarding upward";
       PostEvent(sig & mask, 0);
     }
   }
 
   // Catch writeability events on out and pass them up.
-  void OnEventOut(rtc::StreamInterface* stream, int sig, int err) {
+  void OnEventOut(int sig, int err) {
     if (sig & rtc::SE_WRITE) {
-      RTC_LOG(LS_VERBOSE) << "SSLDummyStreamBase::OnEventOut side=" << side_
+      RTC_LOG(LS_VERBOSE) << "SSLDummyStream::OnEventOut side=" << side_
                           << " sig=" << sig << " forwarding upward";
 
       PostEvent(sig & rtc::SE_WRITE, 0);
@@ -232,20 +291,11 @@
   rtc::Thread* const thread_ = rtc::Thread::Current();
   SSLStreamAdapterTestBase* test_base_;
   const std::string side_;
-  rtc::StreamInterface* in_;
-  rtc::StreamInterface* out_;
+  StreamWrapper* const in_;
+  StreamWrapper* const out_;
   bool first_packet_;
 };
 
-class SSLDummyStreamTLS : public SSLDummyStreamBase {
- public:
-  SSLDummyStreamTLS(SSLStreamAdapterTestBase* test,
-                    absl::string_view side,
-                    rtc::FifoBuffer* in,
-                    rtc::FifoBuffer* out)
-      : SSLDummyStreamBase(test, side, in, out) {}
-};
-
 class BufferQueueStream : public rtc::StreamInterface {
  public:
   BufferQueueStream(size_t capacity, size_t default_size)
@@ -304,15 +354,6 @@
   rtc::BufferQueue buffer_;
 };
 
-class SSLDummyStreamDTLS : public SSLDummyStreamBase {
- public:
-  SSLDummyStreamDTLS(SSLStreamAdapterTestBase* test,
-                     absl::string_view side,
-                     BufferQueueStream* in,
-                     BufferQueueStream* out)
-      : SSLDummyStreamBase(test, side, in, out) {}
-};
-
 static const int kFifoBufferSize = 4096;
 static const int kBufferCapacity = 1;
 static const size_t kDefaultBufferSize = 2048;
@@ -391,11 +432,10 @@
                                     : new ScopedFieldTrials(server_experiment));
       server_ssl_ = rtc::SSLStreamAdapter::Create(CreateServerStream());
     }
-
-    client_ssl_->SignalEvent.connect(this,
-                                     &SSLStreamAdapterTestBase::OnClientEvent);
-    server_ssl_->SignalEvent.connect(this,
-                                     &SSLStreamAdapterTestBase::OnServerEvent);
+    client_ssl_->SetEventCallback(
+        [this](int events, int err) { OnClientEvent(events, err); });
+    server_ssl_->SetEventCallback(
+        [this](int events, int err) { OnServerEvent(events, err); });
   }
 
   // Recreate the client/server identities with the specified validity period.
@@ -648,7 +688,7 @@
     }
   }
 
-  rtc::StreamResult DataWritten(SSLDummyStreamBase* from,
+  rtc::StreamResult DataWritten(SSLDummyStream* from,
                                 const void* data,
                                 size_t data_len,
                                 size_t& written,
@@ -756,13 +796,12 @@
   virtual void TestTransfer(int size) = 0;
 
  private:
-  void OnClientEvent(rtc::StreamInterface* stream, int sig, int err) {
-    RTC_DCHECK_EQ(stream, client_ssl_.get());
+  void OnClientEvent(int sig, int err) {
     RTC_LOG(LS_VERBOSE) << "SSLStreamAdapterTestBase::OnClientEvent sig="
                         << sig;
 
     if (sig & rtc::SE_READ) {
-      ReadData(stream);
+      ReadData(client_ssl_.get());
     }
 
     if (sig & rtc::SE_WRITE) {
@@ -770,12 +809,11 @@
     }
   }
 
-  void OnServerEvent(rtc::StreamInterface* stream, int sig, int err) {
-    RTC_DCHECK_EQ(stream, server_ssl_.get());
+  void OnServerEvent(int sig, int err) {
     RTC_LOG(LS_VERBOSE) << "SSLStreamAdapterTestBase::OnServerEvent sig="
                         << sig;
     if (sig & rtc::SE_READ) {
-      ReadData(stream);
+      ReadData(server_ssl_.get());
     }
   }
 
@@ -819,18 +857,16 @@
                                  "",
                                  false,
                                  ::testing::get<0>(GetParam()),
-                                 ::testing::get<1>(GetParam())),
-        client_buffer_(kFifoBufferSize),
-        server_buffer_(kFifoBufferSize) {}
+                                 ::testing::get<1>(GetParam())) {}
 
   std::unique_ptr<rtc::StreamInterface> CreateClientStream() override final {
     return absl::WrapUnique(
-        new SSLDummyStreamTLS(this, "c2s", &client_buffer_, &server_buffer_));
+        new SSLDummyStream(this, "c2s", &client_buffer_, &server_buffer_));
   }
 
   std::unique_ptr<rtc::StreamInterface> CreateServerStream() override final {
     return absl::WrapUnique(
-        new SSLDummyStreamTLS(this, "s2c", &server_buffer_, &client_buffer_));
+        new SSLDummyStream(this, "s2c", &server_buffer_, &client_buffer_));
   }
 
   // Test data transfer for TLS
@@ -930,8 +966,10 @@
   }
 
  private:
-  rtc::FifoBuffer client_buffer_;
-  rtc::FifoBuffer server_buffer_;
+  StreamWrapper client_buffer_{
+      std::make_unique<rtc::FifoBuffer>(kFifoBufferSize)};
+  StreamWrapper server_buffer_{
+      std::make_unique<rtc::FifoBuffer>(kFifoBufferSize)};
   rtc::MemoryStream send_stream_;
   rtc::MemoryStream recv_stream_;
 };
@@ -940,8 +978,6 @@
  public:
   SSLStreamAdapterTestDTLSBase(rtc::KeyParams param1, rtc::KeyParams param2)
       : SSLStreamAdapterTestBase("", "", true, param1, param2),
-        client_buffer_(kBufferCapacity, kDefaultBufferSize),
-        server_buffer_(kBufferCapacity, kDefaultBufferSize),
         packet_size_(1000),
         count_(0),
         sent_(0) {}
@@ -949,20 +985,18 @@
   SSLStreamAdapterTestDTLSBase(absl::string_view cert_pem,
                                absl::string_view private_key_pem)
       : SSLStreamAdapterTestBase(cert_pem, private_key_pem, true),
-        client_buffer_(kBufferCapacity, kDefaultBufferSize),
-        server_buffer_(kBufferCapacity, kDefaultBufferSize),
         packet_size_(1000),
         count_(0),
         sent_(0) {}
 
   std::unique_ptr<rtc::StreamInterface> CreateClientStream() override final {
     return absl::WrapUnique(
-        new SSLDummyStreamDTLS(this, "c2s", &client_buffer_, &server_buffer_));
+        new SSLDummyStream(this, "c2s", &client_buffer_, &server_buffer_));
   }
 
   std::unique_ptr<rtc::StreamInterface> CreateServerStream() override final {
     return absl::WrapUnique(
-        new SSLDummyStreamDTLS(this, "s2c", &server_buffer_, &client_buffer_));
+        new SSLDummyStream(this, "s2c", &server_buffer_, &client_buffer_));
   }
 
   void WriteData() override {
@@ -1052,8 +1086,10 @@
   }
 
  protected:
-  BufferQueueStream client_buffer_;
-  BufferQueueStream server_buffer_;
+  StreamWrapper client_buffer_{
+      std::make_unique<BufferQueueStream>(kBufferCapacity, kDefaultBufferSize)};
+  StreamWrapper server_buffer_{
+      std::make_unique<BufferQueueStream>(kBufferCapacity, kDefaultBufferSize)};
 
  private:
   size_t packet_size_;
@@ -1075,9 +1111,9 @@
       : SSLStreamAdapterTestDTLSBase(cert_pem, private_key_pem) {}
 };
 
-rtc::StreamResult SSLDummyStreamBase::Write(rtc::ArrayView<const uint8_t> data,
-                                            size_t& written,
-                                            int& error) {
+rtc::StreamResult SSLDummyStream::Write(rtc::ArrayView<const uint8_t> data,
+                                        size_t& written,
+                                        int& error) {
   RTC_LOG(LS_VERBOSE) << "Writing to loopback " << data.size();
 
   if (first_packet_) {
diff --git a/rtc_base/stream.h b/rtc_base/stream.h
index 4b2236a..8eb800c 100644
--- a/rtc_base/stream.h
+++ b/rtc_base/stream.h
@@ -13,9 +13,11 @@
 
 #include <memory>
 
+#include "absl/functional/any_invocable.h"
 #include "api/array_view.h"
 #include "api/sequence_checker.h"
 #include "rtc_base/buffer.h"
+#include "rtc_base/logging.h"
 #include "rtc_base/system/no_unique_address.h"
 #include "rtc_base/system/rtc_export.h"
 #include "rtc_base/third_party/sigslot/sigslot.h"
@@ -83,15 +85,24 @@
   // signalled as a result of this call.
   virtual void Close() = 0;
 
-  // Streams may signal one or more StreamEvents to indicate state changes.
-  // The first argument identifies the stream on which the state change occured.
-  // The second argument is a bit-wise combination of StreamEvents.
-  // If SE_CLOSE is signalled, then the third argument is the associated error
-  // code.  Otherwise, the value is undefined.
-  // Note: Not all streams will support asynchronous event signalling.  However,
-  // SS_OPENING and SR_BLOCK returned from stream member functions imply that
-  // certain events will be raised in the future.
-  sigslot::signal3<StreamInterface*, int, int> SignalEvent;
+  // Streams may issue one or more events to indicate state changes to a
+  // provided callback.
+  // The first argument is a bit-wise combination of `StreamEvent` flags.
+  // If SE_CLOSE is set, then the second argument is the associated error code.
+  // Otherwise, the value of the second parameter is undefined and should be
+  // set to 0.
+  // Note: Not all streams support callbacks.  However, SS_OPENING and
+  // SR_BLOCK returned from member functions imply that certain callbacks will
+  // be made in the future.
+  void SetEventCallback(absl::AnyInvocable<void(int, int)> callback) {
+    RTC_DCHECK_RUN_ON(&callback_sequence_);
+    RTC_DCHECK(!callback_ || !callback);
+    callback_ = std::move(callback);
+  }
+
+  // TODO(bugs.webrtc.org/11943): Remove after updating downstream code.
+  sigslot::signal3<StreamInterface*, int, int> SignalEvent
+      [[deprecated("Use SetEventCallback instead")]];
 
   // Return true if flush is successful.
   virtual bool Flush();
@@ -126,13 +137,23 @@
 
   // Utility function for derived classes.
   void FireEvent(int stream_events, int err) RTC_RUN_ON(&callback_sequence_) {
+    if (callback_) {
+      callback_(stream_events, err);
+    }
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wdeprecated-declarations"
     // TODO(tommi): This is for backwards compatibility only while `SignalEvent`
-    // is being replaced by `SetEventHandler`.
+    // is being replaced by `SetEventCallback`.
     SignalEvent(this, stream_events, err);
+#pragma clang diagnostic pop
   }
 
   RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker callback_sequence_{
       webrtc::SequenceChecker::kDetached};
+
+ private:
+  absl::AnyInvocable<void(int, int)> callback_
+      RTC_GUARDED_BY(&callback_sequence_) = nullptr;
 };
 
 }  // namespace rtc