Revert "Removing MessageHandler dependency from Connection."

This reverts commit 3202e29f72b4f511fcf6e92ef9b0dcbfee6089ff.

Reason for revert: Introduced a crash in the task posted by Destroy()

Original change's description:
> Removing MessageHandler dependency from Connection.
>
> Bug: webrtc:11988
> Change-Id: Ic35bb5baeafbda7210012dceb0d6d5f5b3eb95c9
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249941
> Reviewed-by: Niels Moller <nisse@webrtc.org>
> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#35890}

No-Try: True
Bug: webrtc:11988
Change-Id: Ie70ee145fde75b8cf76b02784176970e7a78e001
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/252541
Auto-Submit: Taylor Brandstetter <deadbeef@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Owners-Override: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36078}
diff --git a/p2p/base/connection.cc b/p2p/base/connection.cc
index a04f318..8575d65 100644
--- a/p2p/base/connection.cc
+++ b/p2p/base/connection.cc
@@ -18,7 +18,6 @@
 #include <vector>
 
 #include "absl/algorithm/container.h"
-#include "absl/memory/memory.h"
 #include "absl/strings/match.h"
 #include "p2p/base/port_allocator.h"
 #include "rtc_base/checks.h"
@@ -834,25 +833,15 @@
 
 void Connection::Destroy() {
   RTC_DCHECK_RUN_ON(network_thread_);
-  RTC_DLOG(LS_VERBOSE) << ToString() << ": Connection destroyed";
-
-  // Fire the 'destroyed' event before deleting the object. This is done
-  // intentionally to avoid a situation whereby the signal might have dangling
-  // pointers to objects that have been deleted by the time the async task
-  // that deletes the connection object runs.
-  SignalDestroyed(this);
-  SignalDestroyed.disconnect_all();
-
+  // TODO(deadbeef, nisse): This may leak if an application closes a
+  // PeerConnection and then quickly destroys the PeerConnectionFactory (along
+  // with the networking thread on which this message is posted). Also affects
+  // tests, with a workaround in
+  // AutoSocketServerThread::~AutoSocketServerThread.
+  RTC_LOG(LS_VERBOSE) << ToString() << ": Connection destroyed";
+  // TODO(bugs.webrtc.org/11988): Use PostTask.
+  port_->thread()->Post(RTC_FROM_HERE, this, MSG_DELETE);
   LogCandidatePairConfig(webrtc::IceCandidatePairConfigType::kDestroyed);
-
-  // Unwind the stack before deleting the object in case upstream callers
-  // need to refer to the Connection's state as part of teardown.
-  // NOTE: We move ownership of 'this' into the capture section of the lambda
-  // so that the object will always be deleted, including if PostTask fails.
-  // In such a case (only tests), deletion would happen inside of the call
-  // to `Destroy()`.
-  network_thread_->PostTask(
-      webrtc::ToQueuedTask([me = absl::WrapUnique(this)]() {}));
 }
 
 void Connection::FailAndDestroy() {
@@ -1433,6 +1422,15 @@
   }
 }
 
+void Connection::OnMessage(rtc::Message* pmsg) {
+  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK(pmsg->message_id == MSG_DELETE);
+  RTC_LOG(LS_INFO) << "Connection deleted with number of pings sent: "
+                   << num_pings_sent_;
+  SignalDestroyed(this);
+  delete this;
+}
+
 int64_t Connection::last_received() const {
   RTC_DCHECK_RUN_ON(network_thread_);
   return std::max(last_data_received_,
diff --git a/p2p/base/connection.h b/p2p/base/connection.h
index 8254706..c102194 100644
--- a/p2p/base/connection.h
+++ b/p2p/base/connection.h
@@ -70,7 +70,9 @@
 
 // Represents a communication link between a port on the local client and a
 // port on the remote client.
-class Connection : public CandidatePairInterface, public sigslot::has_slots<> {
+class Connection : public CandidatePairInterface,
+                   public rtc::MessageHandlerAutoCleanup,
+                   public sigslot::has_slots<> {
  public:
   struct SentPing {
     SentPing(const std::string id, int64_t sent_time, uint32_t nomination)
@@ -318,6 +320,8 @@
   void set_remote_nomination(uint32_t remote_nomination);
 
  protected:
+  enum { MSG_DELETE = 0, MSG_FIRST_AVAILABLE };
+
   // Constructs a new connection to the given remote port.
   Connection(Port* port, size_t index, const Candidate& candidate);
 
@@ -347,6 +351,8 @@
   void set_state(IceCandidatePairState state);
   void set_connected(bool value);
 
+  void OnMessage(rtc::Message* pmsg) override;
+
   // The local port where this connection sends and receives packets.
   Port* port() { return port_; }
   const Port* port() const { return port_; }
diff --git a/p2p/base/p2p_transport_channel_unittest.cc b/p2p/base/p2p_transport_channel_unittest.cc
index 0a24f8c..7aed20a 100644
--- a/p2p/base/p2p_transport_channel_unittest.cc
+++ b/p2p/base/p2p_transport_channel_unittest.cc
@@ -483,9 +483,6 @@
     ep2_.cd1_.ch_.reset();
     ep1_.cd2_.ch_.reset();
     ep2_.cd2_.ch_.reset();
-    // Process pending tasks that need to run for cleanup purposes such as
-    // pending deletion of Connection objects (see Connection::Destroy).
-    rtc::Thread::Current()->ProcessMessages(0);
   }
   P2PTransportChannel* ep1_ch1() { return ep1_.cd1_.ch_.get(); }
   P2PTransportChannel* ep1_ch2() { return ep1_.cd2_.ch_.get(); }
diff --git a/p2p/base/tcp_port.cc b/p2p/base/tcp_port.cc
index 445b0d0..9d54207 100644
--- a/p2p/base/tcp_port.cc
+++ b/p2p/base/tcp_port.cc
@@ -79,7 +79,6 @@
 #include "rtc_base/logging.h"
 #include "rtc_base/net_helper.h"
 #include "rtc_base/rate_tracker.h"
-#include "rtc_base/task_utils/to_queued_task.h"
 #include "rtc_base/third_party/sigslot/sigslot.h"
 
 namespace cricket {
@@ -367,9 +366,7 @@
   }
 }
 
-TCPConnection::~TCPConnection() {
-  RTC_DCHECK_RUN_ON(network_thread_);
-}
+TCPConnection::~TCPConnection() {}
 
 int TCPConnection::Send(const void* data,
                         size_t size,
@@ -496,20 +493,11 @@
     // events.
     pretending_to_be_writable_ = true;
 
-    // If this connection can't become connected and writable again in 5
-    // seconds, it's time to tear this down. This is the case for the original
-    // TCP connection on passive side during a reconnect.
     // We don't attempt reconnect right here. This is to avoid a case where the
     // shutdown is intentional and reconnect is not necessary. We only reconnect
     // when the connection is used to Send() or Ping().
-    port()->thread()->PostDelayedTask(
-        webrtc::ToQueuedTask(network_safety_,
-                             [this]() {
-                               if (pretending_to_be_writable_) {
-                                 Destroy();
-                               }
-                             }),
-        reconnection_timeout());
+    port()->thread()->PostDelayed(RTC_FROM_HERE, reconnection_timeout(), this,
+                                  MSG_TCPCONNECTION_DELAYED_ONCLOSE);
   } else if (!pretending_to_be_writable_) {
     // OnClose could be called when the underneath socket times out during the
     // initial connect() (i.e. `pretending_to_be_writable_` is false) . We have
@@ -519,6 +507,24 @@
   }
 }
 
+void TCPConnection::OnMessage(rtc::Message* pmsg) {
+  switch (pmsg->message_id) {
+    case MSG_TCPCONNECTION_DELAYED_ONCLOSE:
+      // If this connection can't become connected and writable again in 5
+      // seconds, it's time to tear this down. This is the case for the original
+      // TCP connection on passive side during a reconnect.
+      if (pretending_to_be_writable_) {
+        Destroy();
+      }
+      break;
+    case MSG_TCPCONNECTION_FAILED_CREATE_SOCKET:
+      FailAndPrune();
+      break;
+    default:
+      Connection::OnMessage(pmsg);
+  }
+}
+
 void TCPConnection::MaybeReconnect() {
   // Only reconnect for an outgoing TCPConnection when OnClose was signaled and
   // no outstanding reconnect is pending.
@@ -570,13 +576,13 @@
   } else {
     RTC_LOG(LS_WARNING) << ToString() << ": Failed to create connection to "
                         << remote_candidate().address().ToSensitiveString();
-    set_state(IceCandidatePairState::FAILED);
     // We can't FailAndPrune directly here. FailAndPrune and deletes all
     // the StunRequests from the request_map_. And if this is in the stack
     // of Connection::Ping(), we are still using the request.
     // Unwind the stack and defer the FailAndPrune.
-    port()->thread()->PostTask(
-        webrtc::ToQueuedTask(network_safety_, [this]() { FailAndPrune(); }));
+    set_state(IceCandidatePairState::FAILED);
+    port()->thread()->Post(RTC_FROM_HERE, this,
+                           MSG_TCPCONNECTION_FAILED_CREATE_SOCKET);
   }
 }
 
diff --git a/p2p/base/tcp_port.h b/p2p/base/tcp_port.h
index 07d483c..932af50 100644
--- a/p2p/base/tcp_port.h
+++ b/p2p/base/tcp_port.h
@@ -20,7 +20,6 @@
 #include "p2p/base/port.h"
 #include "rtc_base/async_packet_socket.h"
 #include "rtc_base/containers/flat_map.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
 
 namespace cricket {
 
@@ -136,6 +135,8 @@
 
   rtc::AsyncPacketSocket* socket() { return socket_.get(); }
 
+  void OnMessage(rtc::Message* pmsg) override;
+
   // Allow test cases to overwrite the default timeout period.
   int reconnection_timeout() const { return reconnection_timeout_; }
   void set_reconnection_timeout(int timeout_in_ms) {
@@ -143,6 +144,11 @@
   }
 
  protected:
+  enum {
+    MSG_TCPCONNECTION_DELAYED_ONCLOSE = Connection::MSG_FIRST_AVAILABLE,
+    MSG_TCPCONNECTION_FAILED_CREATE_SOCKET,
+  };
+
   // Set waiting_for_stun_binding_complete_ to false to allow data packets in
   // addition to what Port::OnConnectionRequestResponse does.
   void OnConnectionRequestResponse(ConnectionRequest* req,
@@ -184,8 +190,6 @@
   // Allow test case to overwrite the default timeout period.
   int reconnection_timeout_;
 
-  webrtc::ScopedTaskSafety network_safety_;
-
   friend class TCPPort;
 };
 
diff --git a/pc/scenario_tests/goog_cc_test.cc b/pc/scenario_tests/goog_cc_test.cc
index f0a30df..82ae47b 100644
--- a/pc/scenario_tests/goog_cc_test.cc
+++ b/pc/scenario_tests/goog_cc_test.cc
@@ -100,9 +100,6 @@
     s.ProcessMessages(TimeDelta::Seconds(1));
     EXPECT_GE(get_bwe(), initial_bwe);
   }
-
-  caller->pc()->Close();
-  callee->pc()->Close();
 }
 
 }  // namespace test
diff --git a/test/peer_scenario/tests/remote_estimate_test.cc b/test/peer_scenario/tests/remote_estimate_test.cc
index 9190f5c..429a5b4 100644
--- a/test/peer_scenario/tests/remote_estimate_test.cc
+++ b/test/peer_scenario/tests/remote_estimate_test.cc
@@ -102,8 +102,6 @@
         }
       });
   RTC_CHECK(s.WaitAndProcess(&received_abs_send_time));
-  caller->pc()->Close();
-  callee->pc()->Close();
 }
 }  // namespace test
 }  // namespace webrtc
diff --git a/test/peer_scenario/tests/unsignaled_stream_test.cc b/test/peer_scenario/tests/unsignaled_stream_test.cc
index 4f478b4..5cb0405 100644
--- a/test/peer_scenario/tests/unsignaled_stream_test.cc
+++ b/test/peer_scenario/tests/unsignaled_stream_test.cc
@@ -254,8 +254,6 @@
       });
   EXPECT_TRUE(s.WaitAndProcess(&offer_exchange_done));
   EXPECT_TRUE(s.WaitAndProcess(&second_sink.frame_observed_));
-  caller->pc()->Close();
-  callee->pc()->Close();
 }
 
 INSTANTIATE_TEST_SUITE_P(