Reland of Do not delete a connection in the turn port with permission error or refresh error. (patchset #1 id:1 of https://codereview.webrtc.org/2090833002/ )

Reason for revert:
The Webrtc waterfall indicates that this revert is not necessary.

Original issue's description:
> Revert of Do not delete a connection in the turn port with permission error or refresh error. (patchset #6 id:260001 of https://codereview.webrtc.org/2068263003/ )
>
> Reason for revert:
> It broke webrtc builds.
>
> Original issue's description:
> > Do not delete a connection in the turn port with permission error,  refresh error, or binding error.
> >
> > Even if those error happened, the connection may still be able to receive packets for a while.
> > If we delete the connections, all packets arriving will be dropped.
> >
> > BUG=webrtc:6007
> > R=deadbeef@webrtc.org, pthatcher@webrtc.org
> >
> > Committed: https://crrev.com/3d77deb29c15bfb8f794ef3413837a0ec0f0c131
> > Cr-Commit-Position: refs/heads/master@{#13262}
>
> NOPRESUBMIT=true
> NOTREECHECKS=true
> NOTRY=true
> TBR=pthatcher@webrtc.org,deadbeef@webrtc.org
> # Not skipping CQ checks because original CL landed more than 1 days ago.
> BUG=webrtc:6007
>
> Committed: https://crrev.com/3159ffae6b1d5cba2ad972bd3d8074ac85f2c7f9
> Cr-Commit-Position: refs/heads/master@{#13265}

TBR=pthatcher@webrtc.org,deadbeef@webrtc.org
# Skipping CQ checks because original CL landed less than 1 days ago.
NOPRESUBMIT=true
NOTREECHECKS=true
NOTRY=true
BUG=webrtc:6007

Review-Url: https://codereview.webrtc.org/2090073003
Cr-Original-Commit-Position: refs/heads/master@{#13266}
Cr-Mirrored-From: https://chromium.googlesource.com/external/webrtc
Cr-Mirrored-Commit: 079a7a197f0c1f79d5432a5c5058744965727fec
diff --git a/base/virtualsocketserver.cc b/base/virtualsocketserver.cc
index 8b8d050..d871d4c 100644
--- a/base/virtualsocketserver.cc
+++ b/base/virtualsocketserver.cc
@@ -511,7 +511,7 @@
       server_owned_(false),
       msg_queue_(NULL),
       stop_on_idle_(false),
-      network_delay_(TimeMillis()),
+      network_delay_(0),
       next_ipv4_(kInitialNextIPv4),
       next_ipv6_(kInitialNextIPv6),
       next_port_(kFirstEphemeralPort),
diff --git a/p2p/base/p2ptransportchannel.cc b/p2p/base/p2ptransportchannel.cc
index 78cb704..5c1f85d 100644
--- a/p2p/base/p2ptransportchannel.cc
+++ b/p2p/base/p2ptransportchannel.cc
@@ -1349,6 +1349,11 @@
     return false;
   }
 
+  // A failed connection will not be pinged.
+  if (conn->state() == Connection::STATE_FAILED) {
+    return false;
+  }
+
   // An never connected connection cannot be written to at all, so pinging is
   // out of the question. However, if it has become WRITABLE, it is in the
   // reconnecting state so ping is needed.
diff --git a/p2p/base/p2ptransportchannel_unittest.cc b/p2p/base/p2ptransportchannel_unittest.cc
index e5e63a3..87be379 100644
--- a/p2p/base/p2ptransportchannel_unittest.cc
+++ b/p2p/base/p2ptransportchannel_unittest.cc
@@ -2278,6 +2278,24 @@
   EXPECT_EQ(conn2, FindNextPingableConnectionAndPingIt(&ch));
 }
 
+TEST_F(P2PTransportChannelPingTest, TestFailedConnectionNotPingable) {
+  cricket::FakePortAllocator pa(rtc::Thread::Current(), nullptr);
+  cricket::P2PTransportChannel ch("Do not ping failed connections", 1, &pa);
+  PrepareChannel(&ch);
+  ch.Connect();
+  ch.MaybeStartGathering();
+  ch.AddRemoteCandidate(CreateHostCandidate("1.1.1.1", 1, 1));
+
+  cricket::Connection* conn1 = WaitForConnectionTo(&ch, "1.1.1.1", 1);
+  ASSERT_TRUE(conn1 != nullptr);
+
+  EXPECT_EQ(conn1, ch.FindNextPingableConnection());
+  conn1->Prune();  // A pruned connection may still be pingable.
+  EXPECT_EQ(conn1, ch.FindNextPingableConnection());
+  conn1->FailAndPrune();
+  EXPECT_TRUE(nullptr == ch.FindNextPingableConnection());
+}
+
 TEST_F(P2PTransportChannelPingTest, TestSignalStateChanged) {
   cricket::FakePortAllocator pa(rtc::Thread::Current(), nullptr);
   cricket::P2PTransportChannel ch("state change", 1, &pa);
diff --git a/p2p/base/port.cc b/p2p/base/port.cc
index 6e26371..d4dfd91 100644
--- a/p2p/base/port.cc
+++ b/p2p/base/port.cc
@@ -1090,6 +1090,11 @@
   Destroy();
 }
 
+void Connection::FailAndPrune() {
+  set_state(Connection::STATE_FAILED);
+  Prune();
+}
+
 void Connection::PrintPingsSinceLastResponse(std::string* s, size_t max) {
   std::ostringstream oss;
   oss << std::boolalpha;
diff --git a/p2p/base/port.h b/p2p/base/port.h
index 65932b9..5d95b80 100644
--- a/p2p/base/port.h
+++ b/p2p/base/port.h
@@ -523,6 +523,10 @@
   // Makes the connection go away, in a failed state.
   void FailAndDestroy();
 
+  // Prunes the connection and sets its state to STATE_FAILED,
+  // It will not be used or send pings although it can still receive packets.
+  void FailAndPrune();
+
   // Checks that the state of this connection is up-to-date.  The argument is
   // the current time, which is compared against various timeouts.
   void UpdateState(int64_t now);
diff --git a/p2p/base/turnport.cc b/p2p/base/turnport.cc
index ff0c292..4036811 100644
--- a/p2p/base/turnport.cc
+++ b/p2p/base/turnport.cc
@@ -449,7 +449,7 @@
     return NULL;
   }
 
-  if (state_ == STATE_DISCONNECTED) {
+  if (state_ == STATE_DISCONNECTED || state_ == STATE_RECEIVEONLY) {
     return NULL;
   }
 
@@ -469,10 +469,10 @@
   return NULL;
 }
 
-bool TurnPort::DestroyConnection(const rtc::SocketAddress& address) {
+bool TurnPort::FailAndPruneConnection(const rtc::SocketAddress& address) {
   Connection* conn = GetConnection(address);
   if (conn != nullptr) {
-    conn->Destroy();
+    conn->FailAndPrune();
     return true;
   }
   return false;
@@ -561,7 +561,7 @@
 
   if (state_ == STATE_DISCONNECTED) {
     LOG_J(LS_WARNING, this)
-        << "Received TURN message while the Turn port is disconnected";
+        << "Received TURN message while the TURN port is disconnected";
     return false;
   }
 
@@ -739,13 +739,22 @@
   thread()->Post(RTC_FROM_HERE, this, MSG_ALLOCATE_ERROR);
 }
 
-void TurnPort::OnTurnRefreshError() {
-  // Need to Close the port asynchronously because otherwise, the refresh
+void TurnPort::OnRefreshError() {
+  // Need to clear the requests asynchronously because otherwise, the refresh
   // request may be deleted twice: once at the end of the message processing
-  // and the other in Close().
+  // and the other in HandleRefreshError().
   thread()->Post(RTC_FROM_HERE, this, MSG_REFRESH_ERROR);
 }
 
+void TurnPort::HandleRefreshError() {
+  request_manager_.Clear();
+  state_ = STATE_RECEIVEONLY;
+  // Fail and prune all connections; stop sending data.
+  for (auto kv : connections()) {
+    kv.second->FailAndPrune();
+  }
+}
+
 void TurnPort::Close() {
   if (!ready()) {
     OnAllocateError();
@@ -768,7 +777,7 @@
       OnAllocateMismatch();
       break;
     case MSG_REFRESH_ERROR:
-      Close();
+      HandleRefreshError();
       break;
     case MSG_TRY_ALTERNATE_SERVER:
       if (server_address().proto == PROTO_UDP) {
@@ -1277,14 +1286,14 @@
                              << ", id=" << rtc::hex_encode(id())
                              << ", code=" << error_code->code()
                              << ", rtt=" << Elapsed();
-    port_->OnTurnRefreshError();
+    port_->OnRefreshError();
     port_->SignalTurnRefreshResult(port_, error_code->code());
   }
 }
 
 void TurnRefreshRequest::OnTimeout() {
   LOG_J(LS_WARNING, port_) << "TURN refresh timeout " << rtc::hex_encode(id());
-  port_->OnTurnRefreshError();
+  port_->OnRefreshError();
 }
 
 TurnCreatePermissionRequest::TurnCreatePermissionRequest(
@@ -1491,20 +1500,18 @@
       SendCreatePermissionRequest(0);
     }
   } else {
-    port_->DestroyConnection(ext_addr_);
+    bool found = port_->FailAndPruneConnection(ext_addr_);
+    if (found) {
+      LOG(LS_ERROR) << "Received TURN CreatePermission error response, "
+                    << "code=" << code << "; pruned connection.";
+    }
     // Send signal with error code.
     port_->SignalCreatePermissionResult(port_, ext_addr_, code);
-    Connection* c = port_->GetConnection(ext_addr_);
-    if (c) {
-      LOG_J(LS_ERROR, c) << "Received TURN CreatePermission error response, "
-                         << "code=" << code << "; killing connection.";
-      c->FailAndDestroy();
-    }
   }
 }
 
 void TurnEntry::OnCreatePermissionTimeout() {
-  port_->DestroyConnection(ext_addr_);
+  port_->FailAndPruneConnection(ext_addr_);
 }
 
 void TurnEntry::OnChannelBindSuccess() {
@@ -1516,8 +1523,8 @@
 
 void TurnEntry::OnChannelBindError(StunMessage* response, int code) {
   // If the channel bind fails due to errors other than STATE_NONCE,
-  // we just destroy the connection and rely on ICE restart to re-establish
-  // the connection.
+  // we will fail and prune the connection and rely on ICE restart to
+  // re-establish a new connection if needed.
   if (code == STUN_ERROR_STALE_NONCE) {
     if (port_->UpdateNonce(response)) {
       // Send channel bind request with fresh nonce.
@@ -1525,11 +1532,11 @@
     }
   } else {
     state_ = STATE_UNBOUND;
-    port_->DestroyConnection(ext_addr_);
+    port_->FailAndPruneConnection(ext_addr_);
   }
 }
 void TurnEntry::OnChannelBindTimeout() {
   state_ = STATE_UNBOUND;
-  port_->DestroyConnection(ext_addr_);
+  port_->FailAndPruneConnection(ext_addr_);
 }
 }  // namespace cricket
diff --git a/p2p/base/turnport.h b/p2p/base/turnport.h
index 17d2d45..6e528aa 100644
--- a/p2p/base/turnport.h
+++ b/p2p/base/turnport.h
@@ -38,7 +38,9 @@
     STATE_CONNECTING,    // Initial state, cannot send any packets.
     STATE_CONNECTED,     // Socket connected, ready to send stun requests.
     STATE_READY,         // Received allocate success, can send any packets.
-    STATE_DISCONNECTED,  // TCP connection died, cannot send any packets.
+    STATE_RECEIVEONLY,   // Had REFRESH_REQUEST error, cannot send any packets.
+    STATE_DISCONNECTED,  // TCP connection died, cannot send/receive any
+                         // packets.
   };
   static TurnPort* Create(rtc::Thread* thread,
                           rtc::PacketSocketFactory* factory,
@@ -202,7 +204,8 @@
     }
   }
 
-  void OnTurnRefreshError();
+  void OnRefreshError();
+  void HandleRefreshError();
   bool SetAlternateServer(const rtc::SocketAddress& address);
   void ResolveTurnAddress(const rtc::SocketAddress& address);
   void OnResolveResult(rtc::AsyncResolverInterface* resolver);
@@ -245,9 +248,9 @@
   void ScheduleEntryDestruction(TurnEntry* entry);
   void CancelEntryDestruction(TurnEntry* entry);
 
-  // Destroys the connection with remote address |address|. Returns true if
-  // a connection is found and destroyed.
-  bool DestroyConnection(const rtc::SocketAddress& address);
+  // Marks the connection with remote address |address| failed and
+  // pruned (a.k.a. write-timed-out). Returns true if a connection is found.
+  bool FailAndPruneConnection(const rtc::SocketAddress& address);
 
   ProtocolAddress server_address_;
   RelayCredentials credentials_;
diff --git a/p2p/base/turnport_unittest.cc b/p2p/base/turnport_unittest.cc
index 0e415a2..d6d75aa 100644
--- a/p2p/base/turnport_unittest.cc
+++ b/p2p/base/turnport_unittest.cc
@@ -195,7 +195,6 @@
                        const rtc::PacketTime& packet_time) {
     udp_packets_.push_back(rtc::Buffer(data, size));
   }
-  void OnConnectionDestroyed(Connection* conn) { connection_destroyed_ = true; }
   void OnSocketReadPacket(rtc::AsyncPacketSocket* socket,
                           const char* data, size_t size,
                           const rtc::SocketAddress& remote_addr,
@@ -282,9 +281,6 @@
     turn_port_->SignalTurnRefreshResult.connect(
         this, &TurnPortTest::OnTurnRefreshResult);
   }
-  void ConnectConnectionDestroyedSignal(Connection* conn) {
-    conn->SignalDestroyed.connect(this, &TurnPortTest::OnConnectionDestroyed);
-  }
 
   void CreateUdpPort() { CreateUdpPort(kLocalAddr2); }
 
@@ -309,10 +305,23 @@
     ASSERT_TRUE_WAIT(udp_ready_, kTimeout);
   }
 
-  bool CheckConnectionDestroyed() {
-    turn_port_->FlushRequests(cricket::kAllRequests);
-    rtc::Thread::Current()->ProcessMessages(50);
-    return connection_destroyed_;
+  bool CheckConnectionFailedAndPruned(Connection* conn) {
+    return conn && !conn->active() && conn->state() == Connection::STATE_FAILED;
+  }
+
+  // Checks that |turn_port_| has a nonempty set of connections and they are all
+  // failed and pruned.
+  bool CheckAllConnectionsFailedAndPruned() {
+    auto& connections = turn_port_->connections();
+    if (connections.empty()) {
+      return false;
+    }
+    for (auto kv : connections) {
+      if (!CheckConnectionFailedAndPruned(kv.second)) {
+        return false;
+      }
+    }
+    return true;
   }
 
   void TestTurnAlternateServer(cricket::ProtocolType protocol_type) {
@@ -524,7 +533,6 @@
   bool udp_ready_;
   bool test_finish_;
   bool turn_refresh_success_ = false;
-  bool connection_destroyed_ = false;
   std::vector<rtc::Buffer> turn_packets_;
   std::vector<rtc::Buffer> udp_packets_;
   rtc::PacketOptions options;
@@ -733,6 +741,7 @@
 }
 
 TEST_F(TurnPortTest, TestRefreshRequestGetsErrorResponse) {
+  rtc::ScopedFakeClock clock;
   CreateTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr);
   PrepareTurnAndUdpPorts();
   turn_port_->CreateConnection(udp_port_->Candidates()[0],
@@ -745,13 +754,14 @@
   // When this succeeds, it will schedule a new RefreshRequest with the bad
   // credential.
   turn_port_->FlushRequests(cricket::TURN_REFRESH_REQUEST);
-  EXPECT_TRUE_WAIT(turn_refresh_success_, kTimeout);
+  EXPECT_TRUE_SIMULATED_WAIT(turn_refresh_success_, kTimeout, clock);
   // Flush it again, it will receive a bad response.
   turn_port_->FlushRequests(cricket::TURN_REFRESH_REQUEST);
-  EXPECT_TRUE_WAIT(!turn_refresh_success_, kTimeout);
-  EXPECT_TRUE_WAIT(!turn_port_->connected(), kTimeout);
-  EXPECT_TRUE_WAIT(turn_port_->connections().empty(), kTimeout);
-  EXPECT_FALSE(turn_port_->HasRequests());
+  EXPECT_TRUE_SIMULATED_WAIT(!turn_refresh_success_, kTimeout, clock);
+  EXPECT_TRUE_SIMULATED_WAIT(!turn_port_->connected(), kTimeout, clock);
+  EXPECT_TRUE_SIMULATED_WAIT(CheckAllConnectionsFailedAndPruned(), kTimeout,
+                             clock);
+  EXPECT_TRUE_SIMULATED_WAIT(!turn_port_->HasRequests(), kTimeout, clock);
 }
 
 // Test that TurnPort will not handle any incoming packets once it has been
@@ -796,6 +806,20 @@
   ASSERT_TRUE(conn1 == NULL);
 }
 
+// Tests that when a TCP socket is closed, the respective TURN connection will
+// be destroyed.
+TEST_F(TurnPortTest, TestSocketCloseWillDestroyConnection) {
+  turn_server_.AddInternalSocket(kTurnTcpIntAddr, cricket::PROTO_TCP);
+  CreateTurnPort(kTurnUsername, kTurnPassword, kTurnTcpProtoAddr);
+  PrepareTurnAndUdpPorts();
+  Connection* conn = turn_port_->CreateConnection(udp_port_->Candidates()[0],
+                                                  Port::ORIGIN_MESSAGE);
+  EXPECT_NE(nullptr, conn);
+  EXPECT_TRUE(!turn_port_->connections().empty());
+  turn_port_->socket()->SignalClose(turn_port_->socket(), 1);
+  EXPECT_TRUE_WAIT(turn_port_->connections().empty(), kTimeout);
+}
+
 // Test try-alternate-server feature.
 TEST_F(TurnPortTest, TestTurnAlternateServerUDP) {
   TestTurnAlternateServer(cricket::PROTO_UDP);
@@ -899,9 +923,8 @@
 
   Connection* conn = turn_port_->CreateConnection(udp_port_->Candidates()[0],
                                                   Port::ORIGIN_MESSAGE);
-  ConnectConnectionDestroyedSignal(conn);
   ASSERT_TRUE(conn != NULL);
-  ASSERT_TRUE_WAIT(turn_create_permission_success_, kTimeout);
+  EXPECT_TRUE_WAIT(turn_create_permission_success_, kTimeout);
   turn_create_permission_success_ = false;
   // A create-permission-request should be pending.
   // After the next create-permission-response is received, it will schedule
@@ -909,14 +932,15 @@
   cricket::RelayCredentials bad_credentials("bad_user", "bad_pwd");
   turn_port_->set_credentials(bad_credentials);
   turn_port_->FlushRequests(cricket::kAllRequests);
-  ASSERT_TRUE_WAIT(turn_create_permission_success_, kTimeout);
+  EXPECT_TRUE_WAIT(turn_create_permission_success_, kTimeout);
   // Flush the requests again; the create-permission-request will fail.
   turn_port_->FlushRequests(cricket::kAllRequests);
   EXPECT_TRUE_WAIT(!turn_create_permission_success_, kTimeout);
-  EXPECT_TRUE_WAIT(connection_destroyed_, kTimeout);
+  EXPECT_TRUE_WAIT(CheckConnectionFailedAndPruned(conn), kTimeout);
 }
 
 TEST_F(TurnPortTest, TestChannelBindGetErrorResponse) {
+  rtc::ScopedFakeClock clock;
   CreateTurnPort(kTurnUsername, kTurnPassword, kTurnUdpProtoAddr);
   PrepareTurnAndUdpPorts();
   Connection* conn1 = turn_port_->CreateConnection(udp_port_->Candidates()[0],
@@ -924,18 +948,25 @@
   ASSERT_TRUE(conn1 != nullptr);
   Connection* conn2 = udp_port_->CreateConnection(turn_port_->Candidates()[0],
                                                   Port::ORIGIN_MESSAGE);
-  ASSERT_TRUE(conn2 != nullptr);
-  ConnectConnectionDestroyedSignal(conn1);
-  conn1->Ping(0);
-  ASSERT_TRUE_WAIT(conn1->writable(), kTimeout);
 
-  std::string data = "ABC";
-  conn1->Send(data.data(), data.length(), options);
+  ASSERT_TRUE(conn2 != nullptr);
+  conn1->Ping(0);
+  EXPECT_TRUE_SIMULATED_WAIT(conn1->writable(), kTimeout, clock);
   bool success =
       turn_port_->SetEntryChannelId(udp_port_->Candidates()[0].address(), -1);
   ASSERT_TRUE(success);
-  // Next time when the binding request is sent, it will get an ErrorResponse.
-  EXPECT_TRUE_WAIT(CheckConnectionDestroyed(), kTimeout);
+
+  std::string data = "ABC";
+  conn1->Send(data.data(), data.length(), options);
+
+  EXPECT_TRUE_SIMULATED_WAIT(CheckConnectionFailedAndPruned(conn1), kTimeout,
+                             clock);
+  // Verify that no packet can be sent after a bind request error.
+  conn2->SignalReadPacket.connect(static_cast<TurnPortTest*>(this),
+                                  &TurnPortTest::OnUdpReadPacket);
+  conn1->Send(data.data(), data.length(), options);
+  SIMULATED_WAIT(!udp_packets_.empty(), kTimeout, clock);
+  EXPECT_TRUE(udp_packets_.empty());
 }
 
 // Do a TURN allocation, establish a UDP connection, and send some data.
@@ -995,26 +1026,32 @@
 }
 
 // Test that a CreatePermission failure will result in the connection being
-// destroyed.
-TEST_F(TurnPortTest, TestConnectionDestroyedOnCreatePermissionFailure) {
+// pruned and failed.
+TEST_F(TurnPortTest, TestConnectionFaildAndPrunedOnCreatePermissionFailure) {
+  rtc::ScopedFakeClock clock;
+  SIMULATED_WAIT(false, 101, clock);
   turn_server_.AddInternalSocket(kTurnTcpIntAddr, cricket::PROTO_TCP);
   turn_server_.server()->set_reject_private_addresses(true);
   CreateTurnPort(kTurnUsername, kTurnPassword, kTurnTcpProtoAddr);
   turn_port_->PrepareAddress();
-  ASSERT_TRUE_WAIT(turn_ready_, kTimeout);
+  EXPECT_TRUE_SIMULATED_WAIT(turn_ready_, kTimeout, clock);
 
   CreateUdpPort(SocketAddress("10.0.0.10", 0));
   udp_port_->PrepareAddress();
-  ASSERT_TRUE_WAIT(udp_ready_, kTimeout);
+  EXPECT_TRUE_SIMULATED_WAIT(udp_ready_, kTimeout, clock);
   // Create a connection.
   TestConnectionWrapper conn(turn_port_->CreateConnection(
       udp_port_->Candidates()[0], Port::ORIGIN_MESSAGE));
-  ASSERT_TRUE(conn.connection() != nullptr);
+  EXPECT_TRUE(conn.connection() != nullptr);
 
-  // Asynchronously, CreatePermission request should be sent and fail, closing
-  // the connection.
-  EXPECT_TRUE_WAIT(conn.connection() == nullptr, kTimeout);
-  EXPECT_FALSE(turn_create_permission_success_);
+  // Asynchronously, CreatePermission request should be sent and fail, which
+  // will make the connection pruned and failed.
+  EXPECT_TRUE_SIMULATED_WAIT(CheckConnectionFailedAndPruned(conn.connection()),
+                             kTimeout, clock);
+  EXPECT_TRUE_SIMULATED_WAIT(!turn_create_permission_success_, kTimeout, clock);
+  // Check that the connection is not deleted asynchronously.
+  SIMULATED_WAIT(conn.connection() == nullptr, kTimeout, clock);
+  EXPECT_TRUE(conn.connection() != nullptr);
 }
 
 // Test that a TURN allocation is released when the port is closed.