Reland "Make deletion of Connection objects more deterministic."

This reverts commit 869c87a2b9d9f4194d77dd30dc4175a2ecf28a74.

Reason for revert: Re-landing

Original change's description:
> Revert "Make deletion of Connection objects more deterministic."
>
> This reverts commit 942cac2e9e6a205fd673dc003a051cfb3867f2e3.
>
> Reason for revert: Reverting while downstream updates are made.
>
> Original change's description:
> > Make deletion of Connection objects more deterministic.
> >
> > This changes most deletion paths of Connection objects to go through
> > the owner class of the Connection instances, Port.
> >
> > In situations where Connection objects still need to be deleted
> > asynchronously, `async = true` can be passed to
> > `Port::DestroyConnection` and get the same behavior as
> > `Connection::Destroy` formerly gave.
> >
> > The `Destroy()` method still exists for downstream compatibility, but
> > instead of deleting connection objects asynchronously, the deletion
> > now happens synchronously via the Port class.
> >
> > Bug: webrtc:13892, webrtc:13865
> > Change-Id: I07edb7bb5e5d93b33542581b4b09def548de9e12
> > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/259826
> > Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
> > Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> > Cr-Commit-Position: refs/heads/main@{#36676}
>
> Bug: webrtc:13892, webrtc:13865
> Change-Id: I37a15692c8201716402ba5c10f249e4d3754ce4c
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/260862
> Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#36736}

Bug: webrtc:13892, webrtc:13865
Change-Id: I29da6c8899d8550c26ccecbbd0fe5f5556c80212
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/260943
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37171}
diff --git a/p2p/base/connection.cc b/p2p/base/connection.cc
index 58adb64..7e4633a 100644
--- a/p2p/base/connection.cc
+++ b/p2p/base/connection.cc
@@ -18,7 +18,6 @@
 #include <vector>
 
 #include "absl/algorithm/container.h"
-#include "absl/memory/memory.h"
 #include "absl/strings/match.h"
 #include "p2p/base/port_allocator.h"
 #include "rtc_base/checks.h"
@@ -243,6 +242,7 @@
 
 Connection::~Connection() {
   RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK(!port_);
 }
 
 webrtc::TaskQueueBase* Connection::network_thread() const {
@@ -752,8 +752,15 @@
 
 void Connection::Destroy() {
   RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK(port_) << "Calling Destroy() twice?";
+  if (port_)
+    port_->DestroyConnection(this);
+}
+
+bool Connection::Shutdown() {
+  RTC_DCHECK_RUN_ON(network_thread_);
   if (!port_)
-    return;
+    return false;  // already shut down.
 
   RTC_DLOG(LS_VERBOSE) << ToString() << ": Connection destroyed";
 
@@ -761,8 +768,9 @@
   // intentionally to avoid a situation whereby the signal might have dangling
   // pointers to objects that have been deleted by the time the async task
   // that deletes the connection object runs.
-  SignalDestroyed(this);
+  auto destroyed_signals = SignalDestroyed;
   SignalDestroyed.disconnect_all();
+  destroyed_signals(this);
 
   LogCandidatePairConfig(webrtc::IceCandidatePairConfigType::kDestroyed);
 
@@ -770,20 +778,7 @@
   // information required for logging needs access to `port_`.
   port_.reset();
 
-  // Unwind the stack before deleting the object in case upstream callers
-  // need to refer to the Connection's state as part of teardown.
-  // NOTE: We move ownership of 'this' into the capture section of the lambda
-  // so that the object will always be deleted, including if PostTask fails.
-  // In such a case (only tests), deletion would happen inside of the call
-  // to `Destroy()`.
-  network_thread_->PostTask(
-      webrtc::ToQueuedTask([me = absl::WrapUnique(this)]() {}));
-}
-
-void Connection::FailAndDestroy() {
-  RTC_DCHECK_RUN_ON(network_thread_);
-  set_state(IceCandidatePairState::FAILED);
-  Destroy();
+  return true;
 }
 
 void Connection::FailAndPrune() {
@@ -793,9 +788,9 @@
   // and Connection. In some cases (Port dtor), a Connection object is deleted
   // without using the `Destroy` method (port_ won't be nulled and some
   // functionality won't run as expected), while in other cases
-  // (AddOrReplaceConnection), the Connection object is deleted asynchronously
-  // via the `Destroy` method and in that case `port_` will be nulled.
-  // However, in that case, there's a chance that the Port object gets
+  // the Connection object is deleted asynchronously and in that case `port_`
+  // will be nulled.
+  // In such a case, there's a chance that the Port object gets
   // deleted before the Connection object ends up being deleted.
   if (!port_)
     return;
@@ -833,6 +828,9 @@
 
 void Connection::UpdateState(int64_t now) {
   RTC_DCHECK_RUN_ON(network_thread_);
+  if (!port_)
+    return;
+
   int rtt = ConservativeRTTEstimate(rtt_);
 
   if (RTC_LOG_CHECK_LEVEL(LS_VERBOSE)) {
@@ -885,7 +883,7 @@
   // Update the receiving state.
   UpdateReceiving(now);
   if (dead(now)) {
-    Destroy();
+    port_->DestroyConnectionAsync(this);
   }
 }
 
@@ -905,6 +903,9 @@
 
 void Connection::Ping(int64_t now) {
   RTC_DCHECK_RUN_ON(network_thread_);
+  if (!port_)
+    return;
+
   last_ping_sent_ = now;
 
   // If not using renomination, we use "1" to mean "nominated" and "0" to mean
@@ -1364,6 +1365,9 @@
 
 void Connection::OnConnectionRequestErrorResponse(ConnectionRequest* request,
                                                   StunMessage* response) {
+  if (!port_)
+    return;
+
   int error_code = response->GetErrorCodeValue();
   RTC_LOG(LS_WARNING) << ToString() << ": Received "
                       << StunMethodToString(response->type())
@@ -1377,7 +1381,7 @@
       error_code == STUN_ERROR_UNAUTHORIZED) {
     // Recoverable error, retry
   } else if (error_code == STUN_ERROR_ROLE_CONFLICT) {
-    HandleRoleConflictFromPeer();
+    port_->SignalRoleConflict(port_.get());
   } else if (request->msg()->type() == GOOG_PING_REQUEST) {
     // Race, retry.
   } else {
@@ -1385,7 +1389,8 @@
     RTC_LOG(LS_ERROR) << ToString()
                       << ": Received STUN error response, code=" << error_code
                       << "; killing connection";
-    FailAndDestroy();
+    set_state(IceCandidatePairState::FAILED);
+    port_->DestroyConnectionAsync(this);
   }
 }
 
@@ -1414,10 +1419,6 @@
   }
 }
 
-void Connection::HandleRoleConflictFromPeer() {
-  port_->SignalRoleConflict(port());
-}
-
 IceCandidatePairState Connection::state() const {
   RTC_DCHECK_RUN_ON(network_thread_);
   return state_;
@@ -1496,17 +1497,22 @@
   stats_.rtt = rtt_;
   stats_.key = this;
   stats_.state = state_;
-  stats_.priority = priority();
+  if (port_) {
+    stats_.priority = priority();
+    stats_.local_candidate = local_candidate();
+  }
   stats_.nominated = nominated();
   stats_.total_round_trip_time_ms = total_round_trip_time_ms_;
   stats_.current_round_trip_time_ms = current_round_trip_time_ms_;
-  stats_.local_candidate = local_candidate();
   stats_.remote_candidate = remote_candidate();
   return stats_;
 }
 
 void Connection::MaybeUpdateLocalCandidate(StunRequest* request,
                                            StunMessage* response) {
+  if (!port_)
+    return;
+
   // RFC 5245
   // The agent checks the mapped address from the STUN response.  If the
   // transport address does not match any of the local candidates that the
@@ -1648,6 +1654,9 @@
 int ProxyConnection::Send(const void* data,
                           size_t size,
                           const rtc::PacketOptions& options) {
+  if (!port_)
+    return SOCKET_ERROR;
+
   stats_.sent_total_packets++;
   int sent =
       port_->SendTo(data, size, remote_candidate_.address(), options, true);
diff --git a/p2p/base/connection.h b/p2p/base/connection.h
index c654e0d..9e19d07 100644
--- a/p2p/base/connection.h
+++ b/p2p/base/connection.h
@@ -179,11 +179,17 @@
   int receiving_timeout() const;
   void set_receiving_timeout(absl::optional<int> receiving_timeout_ms);
 
-  // Makes the connection go away.
+  // Deletes a `Connection` instance is by calling the `DestroyConnection`
+  // method in `Port`.
+  // Note: When the function returns, the object has been deleted.
   void Destroy();
 
-  // Makes the connection go away, in a failed state.
-  void FailAndDestroy();
+  // Signals object destruction, releases outstanding references and performs
+  // final logging.
+  // The function will return `true` when shutdown was performed, signals
+  // emitted and outstanding references released. If the function returns
+  // `false`, `Shutdown()` has previously been called.
+  bool Shutdown();
 
   // Prunes the connection and sets its state to STATE_FAILED,
   // It will not be used or send pings although it can still receive packets.
@@ -249,9 +255,6 @@
   // controlling side.
   sigslot::signal1<Connection*> SignalNominated;
 
-  // Invoked when Connection receives STUN error response with 487 code.
-  void HandleRoleConflictFromPeer();
-
   IceCandidatePairState state() const;
 
   int num_pings_sent() const;
diff --git a/p2p/base/p2p_transport_channel.cc b/p2p/base/p2p_transport_channel.cc
index 692d4dc..3afd0d3 100644
--- a/p2p/base/p2p_transport_channel.cc
+++ b/p2p/base/p2p_transport_channel.cc
@@ -220,9 +220,10 @@
   TRACE_EVENT0("webrtc", "P2PTransportChannel::~P2PTransportChannel");
   RTC_DCHECK_RUN_ON(network_thread_);
   std::vector<Connection*> copy(connections().begin(), connections().end());
-  for (Connection* con : copy) {
-    con->SignalDestroyed.disconnect(this);
-    con->Destroy();
+  for (Connection* connection : copy) {
+    connection->SignalDestroyed.disconnect(this);
+    ice_controller_->OnConnectionDestroyed(connection);
+    connection->Destroy();
   }
   resolvers_.clear();
 }
diff --git a/p2p/base/port.cc b/p2p/base/port.cc
index fe1a98a..d9584e3 100644
--- a/p2p/base/port.cc
+++ b/p2p/base/port.cc
@@ -18,6 +18,7 @@
 #include <vector>
 
 #include "absl/algorithm/container.h"
+#include "absl/memory/memory.h"
 #include "absl/strings/match.h"
 #include "absl/strings/string_view.h"
 #include "p2p/base/connection.h"
@@ -185,22 +186,7 @@
 Port::~Port() {
   RTC_DCHECK_RUN_ON(thread_);
   CancelPendingTasks();
-
-  // Delete all of the remaining connections.  We copy the list up front
-  // because each deletion will cause it to be modified.
-
-  std::vector<Connection*> list;
-
-  AddressMap::iterator iter = connections_.begin();
-  while (iter != connections_.end()) {
-    list.push_back(iter->second);
-    ++iter;
-  }
-
-  for (uint32_t i = 0; i < list.size(); i++) {
-    list[i]->SignalDestroyed.disconnect(this);
-    delete list[i];
-  }
+  DestroyAllConnections();
 }
 
 const std::string& Port::Type() const {
@@ -360,11 +346,11 @@
         << ": A new connection was created on an existing remote address. "
            "New remote candidate: "
         << conn->remote_candidate().ToSensitiveString();
-    ret.first->second->SignalDestroyed.disconnect(this);
-    ret.first->second->Destroy();
+    std::unique_ptr<Connection> old_conn = absl::WrapUnique(ret.first->second);
     ret.first->second = conn;
+    HandleConnectionDestroyed(old_conn.get());
+    old_conn->Shutdown();
   }
-  conn->SignalDestroyed.connect(this, &Port::OnConnectionDestroyed);
 }
 
 void Port::OnReadPacket(const char* data,
@@ -622,9 +608,9 @@
 
 void Port::DestroyAllConnections() {
   RTC_DCHECK_RUN_ON(thread_);
-  for (auto kv : connections_) {
-    kv.second->SignalDestroyed.disconnect(this);
-    kv.second->Destroy();
+  for (auto& [unused, connection] : connections_) {
+    connection->Shutdown();
+    delete connection;
   }
   connections_.clear();
 }
@@ -904,11 +890,15 @@
   enable_port_packets_ = true;
 }
 
-void Port::OnConnectionDestroyed(Connection* conn) {
-  AddressMap::iterator iter =
-      connections_.find(conn->remote_candidate().address());
-  RTC_DCHECK(iter != connections_.end());
-  connections_.erase(iter);
+bool Port::OnConnectionDestroyed(Connection* conn) {
+  if (connections_.erase(conn->remote_candidate().address()) == 0) {
+    // This could indicate a programmer error outside of webrtc so while we
+    // do have this check here to alert external developers, we also need to
+    // handle it since it might be a corner case not caught in tests.
+    RTC_DCHECK_NOTREACHED() << "Calling Destroy recursively?";
+    return false;
+  }
+
   HandleConnectionDestroyed(conn);
 
   // Ports time out after all connections fail if it is not marked as
@@ -921,6 +911,28 @@
     thread_->PostDelayed(RTC_FROM_HERE, timeout_delay_, this,
                          MSG_DESTROY_IF_DEAD);
   }
+
+  return true;
+}
+
+void Port::DestroyConnectionInternal(Connection* conn, bool async) {
+  RTC_DCHECK_RUN_ON(thread_);
+  if (!OnConnectionDestroyed(conn))
+    return;
+
+  conn->Shutdown();
+  if (async) {
+    // Unwind the stack before deleting the object in case upstream callers
+    // need to refer to the Connection's state as part of teardown.
+    // NOTE: We move ownership of `conn` into the capture section of the lambda
+    // so that the object will always be deleted, including if PostTask fails.
+    // In such a case (only tests), deletion would happen inside of the call
+    // to `DestroyConnection()`.
+    thread_->PostTask(
+        webrtc::ToQueuedTask([conn = absl::WrapUnique(conn)]() {}));
+  } else {
+    delete conn;
+  }
 }
 
 void Port::Destroy() {
diff --git a/p2p/base/port.h b/p2p/base/port.h
index b3c3d85..fca5619 100644
--- a/p2p/base/port.h
+++ b/p2p/base/port.h
@@ -296,6 +296,19 @@
   // Returns the connection to the given address or NULL if none exists.
   Connection* GetConnection(const rtc::SocketAddress& remote_addr) override;
 
+  // Removes and deletes a connection object. `DestroyConnection` will
+  // delete the connection object directly whereas `DestroyConnectionAsync`
+  // defers the `delete` operation to when the call stack has been unwound.
+  // Async may be needed when deleting a connection object from within a
+  // callback.
+  void DestroyConnection(Connection* conn) {
+    DestroyConnectionInternal(conn, false);
+  }
+
+  void DestroyConnectionAsync(Connection* conn) {
+    DestroyConnectionInternal(conn, true);
+  }
+
   // In a shared socket mode each port which shares the socket will decide
   // to accept the packet based on the `remote_addr`. Currently only UDP
   // port implemented this method.
@@ -454,8 +467,19 @@
 
  private:
   void Construct();
-  // Called when one of our connections deletes itself.
-  void OnConnectionDestroyed(Connection* conn);
+
+  // Called internally when deleting a connection object.
+  // Returns true if the connection object was removed from the `connections_`
+  // list and the state updated accordingly. If the connection was not found
+  // in the list, the return value is false. Note that this may indicate
+  // incorrect behavior of external code that might be attempting to delete
+  // connection objects from within a 'on destroyed' callback notification
+  // for the connection object itself.
+  bool OnConnectionDestroyed(Connection* conn);
+
+  // Private implementation of DestroyConnection to keep the async usage
+  // distinct.
+  void DestroyConnectionInternal(Connection* conn, bool async);
 
   void OnNetworkTypeChanged(const rtc::Network* network);
 
diff --git a/p2p/base/port_unittest.cc b/p2p/base/port_unittest.cc
index 5b2ac8a..22d58ff 100644
--- a/p2p/base/port_unittest.cc
+++ b/p2p/base/port_unittest.cc
@@ -274,11 +274,7 @@
         [this](PortInterface* port) { OnSrcPortDestroyed(port); });
   }
 
-  ~TestChannel() {
-    if (conn_) {
-      conn_->SignalDestroyed.disconnect(this);
-    }
-  }
+  ~TestChannel() { Stop(); }
 
   int complete_count() { return complete_count_; }
   Connection* conn() { return conn_; }
@@ -287,6 +283,7 @@
 
   void Start() { port_->PrepareAddress(); }
   void CreateConnection(const Candidate& remote_candidate) {
+    RTC_DCHECK(!conn_);
     conn_ = port_->CreateConnection(remote_candidate, Port::ORIGIN_MESSAGE);
     IceMode remote_ice_mode =
         (ice_mode_ == ICEMODE_FULL) ? ICEMODE_LITE : ICEMODE_FULL;
@@ -298,6 +295,7 @@
                                      &TestChannel::OnConnectionReadyToSend);
     connection_ready_to_send_ = false;
   }
+
   void OnConnectionStateChange(Connection* conn) {
     if (conn->write_state() == Connection::STATE_WRITABLE) {
       conn->set_use_candidate_attr(true);
@@ -305,6 +303,10 @@
     }
   }
   void AcceptConnection(const Candidate& remote_candidate) {
+    if (conn_) {
+      conn_->SignalDestroyed.disconnect(this);
+      conn_ = nullptr;
+    }
     ASSERT_TRUE(remote_request_.get() != NULL);
     Candidate c = remote_candidate;
     c.set_address(remote_address_);
@@ -317,8 +319,7 @@
   void Ping(int64_t now) { conn_->Ping(now); }
   void Stop() {
     if (conn_) {
-      conn_->SignalDestroyed.disconnect(this);
-      conn_->Destroy();
+      port_->DestroyConnection(conn_);
       conn_ = nullptr;
     }
   }
@@ -358,7 +359,7 @@
   void OnDestroyed(Connection* conn) {
     ASSERT_EQ(conn_, conn);
     RTC_LOG(LS_INFO) << "OnDestroy connection " << conn << " deleted";
-    conn_ = NULL;
+    conn_ = nullptr;
     // When the connection is destroyed, also clear these fields so future
     // connections are possible.
     remote_request_.reset();
@@ -677,7 +678,7 @@
 
     // Speed up destroying ch2's connection such that the test is ready to
     // accept a new connection from ch1 before ch1's connection destroys itself.
-    ch2->conn()->Destroy();
+    ch2->Stop();
     EXPECT_TRUE_WAIT(ch2->conn() == NULL, kDefaultTimeout);
   }
 
diff --git a/p2p/base/tcp_port.cc b/p2p/base/tcp_port.cc
index ee644eb..d04ec8d 100644
--- a/p2p/base/tcp_port.cc
+++ b/p2p/base/tcp_port.cc
@@ -491,6 +491,8 @@
   RTC_DCHECK(socket == socket_.get());
   RTC_LOG(LS_INFO) << ToString() << ": Connection closed with error " << error;
 
+  RTC_DCHECK(port());
+
   // Guard against the condition where IPC socket will call OnClose for every
   // packet it can't send.
   if (connected()) {
@@ -520,7 +522,7 @@
     // to manually destroy here as this connection, as never connected, will not
     // be scheduled for ping to trigger destroy.
     socket_->UnsubscribeClose(this);
-    Destroy();
+    port()->DestroyConnectionAsync(this);
   }
 }
 
diff --git a/p2p/base/turn_port.h b/p2p/base/turn_port.h
index 143208a..2d9166e 100644
--- a/p2p/base/turn_port.h
+++ b/p2p/base/turn_port.h
@@ -201,12 +201,11 @@
   // Finds the turn entry with `address` and sets its channel id.
   // Returns true if the entry is found.
   bool SetEntryChannelId(const rtc::SocketAddress& address, int channel_id);
-  // Visible for testing.
-  // Shuts down the turn port, usually because of some fatal errors.
-  void Close();
 
   void HandleConnectionDestroyed(Connection* conn) override;
 
+  void CloseForTest() { Close(); }
+
  protected:
   TurnPort(rtc::Thread* thread,
            rtc::PacketSocketFactory* factory,
@@ -249,6 +248,9 @@
 
   rtc::DiffServCodePoint StunDscpValue() const override;
 
+  // Shuts down the turn port, frees requests and deletes connections.
+  void Close();
+
  private:
   enum {
     MSG_ALLOCATE_ERROR = MSG_FIRST_AVAILABLE,
diff --git a/p2p/base/turn_port_unittest.cc b/p2p/base/turn_port_unittest.cc
index 2d40e97..ecde0ce 100644
--- a/p2p/base/turn_port_unittest.cc
+++ b/p2p/base/turn_port_unittest.cc
@@ -664,7 +664,8 @@
 
     // Destroy the connection on the TURN port. The TurnEntry still exists, so
     // the TURN port should still process a ping from an unknown address.
-    conn2->Destroy();
+    turn_port_->DestroyConnection(conn2);
+
     conn1->Ping(0);
     EXPECT_TRUE_SIMULATED_WAIT(turn_unknown_address_, kSimulatedRtt,
                                fake_clock_);
@@ -1268,7 +1269,7 @@
   EXPECT_EQ_SIMULATED_WAIT(Connection::STATE_WRITABLE, conn2->write_state(),
                            kSimulatedRtt * 2, fake_clock_);
 
-  turn_port_->Close();
+  turn_port_->CloseForTest();
   SIMULATED_WAIT(false, kSimulatedRtt, fake_clock_);
   turn_unknown_address_ = false;
   conn2->Ping(0);