[CallbackList] Use CallbackList in AsyncPacketSocket for close events.

This removes use of the SignalClose sigslot. This CL includes thread
checks for the callback list and updates some call sites to unsubscribe
from events before deletion or detaching from a socket instance.

Bug: webrtc:11943
Change-Id: Ib66d39aa5cc795b750c9e3eaa85ed6af8b55b2b5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/258561
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36540}
diff --git a/p2p/base/port_unittest.cc b/p2p/base/port_unittest.cc
index 81d5cfe..1c73c96 100644
--- a/p2p/base/port_unittest.cc
+++ b/p2p/base/port_unittest.cc
@@ -672,8 +672,8 @@
     // Ensure redundant SignalClose events on TcpConnection won't break tcp
     // reconnection. Chromium will fire SignalClose for all outstanding IPC
     // packets during reconnection.
-    tcp_conn1->socket()->SignalClose(tcp_conn1->socket(), 0);
-    tcp_conn2->socket()->SignalClose(tcp_conn2->socket(), 0);
+    tcp_conn1->socket()->NotifyClosedForTest(0);
+    tcp_conn2->socket()->NotifyClosedForTest(0);
 
     // Speed up destroying ch2's connection such that the test is ready to
     // accept a new connection from ch1 before ch1's connection destroys itself.
@@ -1625,7 +1625,7 @@
   lconn->Ping(0);
 
   // Now disconnect the client socket...
-  socket->SignalClose(socket, 1);
+  socket->NotifyClosedForTest(1);
 
   // And prevent new sockets from being created.
   socket_factory.set_next_client_tcp_socket(nullptr);
diff --git a/p2p/base/tcp_port.cc b/p2p/base/tcp_port.cc
index b9d8e85..2964c8b 100644
--- a/p2p/base/tcp_port.cc
+++ b/p2p/base/tcp_port.cc
@@ -519,6 +519,7 @@
     // initial connect() (i.e. `pretending_to_be_writable_` is false) . We have
     // to manually destroy here as this connection, as never connected, will not
     // be scheduled for ping to trigger destroy.
+    socket_->UnsubscribeClose(this);
     Destroy();
   }
 }
@@ -557,6 +558,11 @@
   int opts = (remote_candidate().protocol() == SSLTCP_PROTOCOL_NAME)
                  ? rtc::PacketSocketFactory::OPT_TLS_FAKE
                  : 0;
+
+  if (socket_) {
+    socket_->UnsubscribeClose(this);
+  }
+
   rtc::PacketSocketTcpOptions tcp_opts;
   tcp_opts.opts = opts;
   socket_.reset(port()->socket_factory()->CreateClientTcpSocket(
@@ -590,7 +596,8 @@
   }
   socket->SignalReadPacket.connect(this, &TCPConnection::OnReadPacket);
   socket->SignalReadyToSend.connect(this, &TCPConnection::OnReadyToSend);
-  socket->SignalClose.connect(this, &TCPConnection::OnClose);
+  socket->SubscribeClose(
+      this, [this](rtc::AsyncPacketSocket* s, int err) { OnClose(s, err); });
 }
 
 }  // namespace cricket
diff --git a/p2p/base/turn_port.cc b/p2p/base/turn_port.cc
index e15f352..31b7272 100644
--- a/p2p/base/turn_port.cc
+++ b/p2p/base/turn_port.cc
@@ -306,6 +306,10 @@
   while (!entries_.empty()) {
     DestroyEntry(entries_.front());
   }
+
+  if (socket_)
+    socket_->UnsubscribeClose(this);
+
   if (!SharedSocket()) {
     delete socket_;
   }
@@ -451,7 +455,9 @@
   if (server_address_.proto == PROTO_TCP ||
       server_address_.proto == PROTO_TLS) {
     socket_->SignalConnect.connect(this, &TurnPort::OnSocketConnect);
-    socket_->SignalClose.connect(this, &TurnPort::OnSocketClose);
+    socket_->SubscribeClose(this, [this](rtc::AsyncPacketSocket* s, int err) {
+      OnSocketClose(s, err);
+    });
   } else {
     state_ = STATE_CONNECTED;
   }
@@ -542,6 +548,9 @@
                    << ": Allocating a new socket after "
                       "STUN_ERROR_ALLOCATION_MISMATCH, retry: "
                    << allocate_mismatch_retries_ + 1;
+
+  socket_->UnsubscribeClose(this);
+
   if (SharedSocket()) {
     ResetSharedSocket();
   } else {
diff --git a/p2p/base/turn_port_unittest.cc b/p2p/base/turn_port_unittest.cc
index d1b911f..2244b9d 100644
--- a/p2p/base/turn_port_unittest.cc
+++ b/p2p/base/turn_port_unittest.cc
@@ -1305,7 +1305,7 @@
                                                   Port::ORIGIN_MESSAGE);
   EXPECT_NE(nullptr, conn);
   EXPECT_TRUE(!turn_port_->connections().empty());
-  turn_port_->socket()->SignalClose(turn_port_->socket(), 1);
+  turn_port_->socket()->NotifyClosedForTest(1);
   EXPECT_TRUE_SIMULATED_WAIT(turn_port_->connections().empty(),
                              kConnectionDestructionDelay, fake_clock_);
 }
diff --git a/p2p/base/turn_server.cc b/p2p/base/turn_server.cc
index 6a5d7a9..85baf75 100644
--- a/p2p/base/turn_server.cc
+++ b/p2p/base/turn_server.cc
@@ -195,7 +195,10 @@
     cricket::AsyncStunTCPSocket* tcp_socket =
         new cricket::AsyncStunTCPSocket(accepted_socket);
 
-    tcp_socket->SignalClose.connect(this, &TurnServer::OnInternalSocketClose);
+    tcp_socket->SubscribeClose(this,
+                               [this](rtc::AsyncPacketSocket* s, int err) {
+                                 OnInternalSocketClose(s, err);
+                               });
     // Finally add the socket so it can start communicating with the client.
     AddInternalSocket(tcp_socket, info.proto);
   }
@@ -564,6 +567,7 @@
   InternalSocketMap::iterator iter = server_sockets_.find(socket);
   if (iter != server_sockets_.end()) {
     rtc::AsyncPacketSocket* socket = iter->first;
+    socket->UnsubscribeClose(this);
     socket->SignalReadPacket.disconnect(this);
     server_sockets_.erase(iter);
     std::unique_ptr<rtc::AsyncPacketSocket> socket_to_delete =
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 5b3776a..bf39ebe 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -935,6 +935,7 @@
   deps = [
     ":async_resolver_interface",
     ":async_socket",
+    ":callback_list",
     ":checks",
     ":ip_address",
     ":logging",
diff --git a/rtc_base/async_packet_socket.cc b/rtc_base/async_packet_socket.cc
index d5435d7..1ce0d3b 100644
--- a/rtc_base/async_packet_socket.cc
+++ b/rtc_base/async_packet_socket.cc
@@ -24,10 +24,24 @@
 PacketOptions::PacketOptions(const PacketOptions& other) = default;
 PacketOptions::~PacketOptions() = default;
 
-AsyncPacketSocket::AsyncPacketSocket() = default;
+AsyncPacketSocket::AsyncPacketSocket() {
+  network_checker_.Detach();
+}
 
 AsyncPacketSocket::~AsyncPacketSocket() = default;
 
+void AsyncPacketSocket::SubscribeClose(
+    const void* removal_tag,
+    std::function<void(AsyncPacketSocket*, int)> callback) {
+  RTC_DCHECK_RUN_ON(&network_checker_);
+  on_close_.AddReceiver(removal_tag, std::move(callback));
+}
+
+void AsyncPacketSocket::UnsubscribeClose(const void* removal_tag) {
+  RTC_DCHECK_RUN_ON(&network_checker_);
+  on_close_.RemoveReceivers(removal_tag);
+}
+
 void CopySocketInformationToPacketInfo(size_t packet_size_bytes,
                                        const AsyncPacketSocket& socket_from,
                                        bool is_connectionless,
diff --git a/rtc_base/async_packet_socket.h b/rtc_base/async_packet_socket.h
index 2e334ec..aa31e25 100644
--- a/rtc_base/async_packet_socket.h
+++ b/rtc_base/async_packet_socket.h
@@ -13,9 +13,12 @@
 
 #include <vector>
 
+#include "api/sequence_checker.h"
+#include "rtc_base/callback_list.h"
 #include "rtc_base/dscp.h"
 #include "rtc_base/network/sent_packet.h"
 #include "rtc_base/socket.h"
+#include "rtc_base/system/no_unique_address.h"
 #include "rtc_base/system/rtc_export.h"
 #include "rtc_base/third_party/sigslot/sigslot.h"
 #include "rtc_base/time_utils.h"
@@ -100,6 +103,11 @@
   virtual int GetError() const = 0;
   virtual void SetError(int error) = 0;
 
+  // Register a callback to be called when the socket is closed.
+  void SubscribeClose(const void* removal_tag,
+                      std::function<void(AsyncPacketSocket*, int)> callback);
+  void UnsubscribeClose(const void* removal_tag);
+
   // Emitted each time a packet is read. Used only for UDP and
   // connected TCP sockets.
   sigslot::signal5<AsyncPacketSocket*,
@@ -126,9 +134,25 @@
   // CONNECTING to CONNECTED.
   sigslot::signal1<AsyncPacketSocket*> SignalConnect;
 
-  // Emitted for client TCP sockets when state is changed from
-  // CONNECTED to CLOSED.
-  sigslot::signal2<AsyncPacketSocket*, int> SignalClose;
+  void NotifyClosedForTest(int err) { NotifyClosed(err); }
+
+ protected:
+  // TODO(bugs.webrtc.org/11943): Remove after updating downstream code.
+  void SignalClose(AsyncPacketSocket* s, int err) {
+    RTC_DCHECK_EQ(s, this);
+    NotifyClosed(err);
+  }
+
+  void NotifyClosed(int err) {
+    RTC_DCHECK_RUN_ON(&network_checker_);
+    on_close_.Send(this, err);
+  }
+
+  RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker network_checker_;
+
+ private:
+  webrtc::CallbackList<AsyncPacketSocket*, int> on_close_
+      RTC_GUARDED_BY(&network_checker_);
 };
 
 // Listen socket, producing an AsyncPacketSocket when a peer connects.
diff --git a/rtc_base/async_tcp_socket.cc b/rtc_base/async_tcp_socket.cc
index c248075..d29eafd 100644
--- a/rtc_base/async_tcp_socket.cc
+++ b/rtc_base/async_tcp_socket.cc
@@ -68,7 +68,6 @@
       max_outsize_(max_packet_size) {
   inbuf_.EnsureCapacity(kMinimumRecvSize);
 
-  RTC_DCHECK(socket_.get() != nullptr);
   socket_->SignalConnectEvent.connect(this,
                                       &AsyncTCPSocketBase::OnConnectEvent);
   socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent);
@@ -237,7 +236,7 @@
 }
 
 void AsyncTCPSocketBase::OnCloseEvent(Socket* socket, int error) {
-  SignalClose(this, error);
+  NotifyClosed(error);
 }
 
 // AsyncTCPSocket
diff --git a/rtc_base/test_echo_server.h b/rtc_base/test_echo_server.h
index ba5f997..e4f70ca 100644
--- a/rtc_base/test_echo_server.h
+++ b/rtc_base/test_echo_server.h
@@ -45,7 +45,8 @@
     if (raw_socket) {
       AsyncTCPSocket* packet_socket = new AsyncTCPSocket(raw_socket);
       packet_socket->SignalReadPacket.connect(this, &TestEchoServer::OnPacket);
-      packet_socket->SignalClose.connect(this, &TestEchoServer::OnClose);
+      packet_socket->SubscribeClose(
+          this, [this](AsyncPacketSocket* s, int err) { OnClose(s, err); });
       client_sockets_.push_back(packet_socket);
     }
   }