Implement proper SCTP data channel closing procedure.

The proper closing procedure is:
1. Alice resets outgoing stream.
2. Bob receives incoming stream reset, resets his outgoing stream.
3. Alice receives incoming stream reset; channel closed!
4. Bob receives acknowledgement of reset; channel closed!

https://tools.ietf.org/html/draft-ietf-rtcweb-data-channel-13#section-6.7

However, up until now we've been sending both an incoming and outgoing reset
from the side initiating the closing procedure, and doing nothing on the remote
side.

This means that if you call "Close" and the remote endpoint is using an old
version of WebRTC, the channel's state will be stuck at "closing" since the
remote endpoint won't send a reset. Which is already what happens when Firefox
is talking to Chrome.

This CL also fixes an issue where the DataChannel's state prematurely went to
"closed" before the closing procedure was complete. Which could result in a
new DataChannel attempting to re-use the ID and failing.

TBR=magjed@webrtc.org

Bug: chromium:449934, webrtc:4453
Change-Id: Ic1ba813e46538c6c65868961aae6a9780d68a5e2
Reviewed-on: https://webrtc-review.googlesource.com/79061
Reviewed-by: Taylor Brandstetter <deadbeef@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Commit-Queue: Taylor Brandstetter <deadbeef@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#23478}
diff --git a/media/sctp/sctptransport.cc b/media/sctp/sctptransport.cc
index b345985..51b4d49 100644
--- a/media/sctp/sctptransport.cc
+++ b/media/sctp/sctptransport.cc
@@ -23,6 +23,7 @@
 #include <stdarg.h>
 #include <stdio.h>
 
+#include <algorithm>
 #include <memory>
 #include <sstream>
 
@@ -73,65 +74,6 @@
   PPID_TEXT_LAST = 51
 };
 
-typedef std::set<uint32_t> StreamSet;
-
-// Returns a comma-separated, human-readable list of the stream IDs in 's'
-std::string ListStreams(const StreamSet& s) {
-  std::stringstream result;
-  bool first = true;
-  for (StreamSet::const_iterator it = s.begin(); it != s.end(); ++it) {
-    if (!first) {
-      result << ", " << *it;
-    } else {
-      result << *it;
-      first = false;
-    }
-  }
-  return result.str();
-}
-
-// Returns a pipe-separated, human-readable list of the SCTP_STREAM_RESET
-// flags in 'flags'
-std::string ListFlags(int flags) {
-  std::stringstream result;
-  bool first = true;
-// Skip past the first 12 chars (strlen("SCTP_STREAM_"))
-#define MAKEFLAG(X) \
-  { X, #X + 12 }
-  struct flaginfo_t {
-    int value;
-    const char* name;
-  } flaginfo[] = {MAKEFLAG(SCTP_STREAM_RESET_INCOMING_SSN),
-                  MAKEFLAG(SCTP_STREAM_RESET_OUTGOING_SSN),
-                  MAKEFLAG(SCTP_STREAM_RESET_DENIED),
-                  MAKEFLAG(SCTP_STREAM_RESET_FAILED),
-                  MAKEFLAG(SCTP_STREAM_CHANGE_DENIED)};
-#undef MAKEFLAG
-  for (uint32_t i = 0; i < arraysize(flaginfo); ++i) {
-    if (flags & flaginfo[i].value) {
-      if (!first)
-        result << " | ";
-      result << flaginfo[i].name;
-      first = false;
-    }
-  }
-  return result.str();
-}
-
-// Returns a comma-separated, human-readable list of the integers in 'array'.
-// All 'num_elems' of them.
-std::string ListArray(const uint16_t* array, int num_elems) {
-  std::stringstream result;
-  for (int i = 0; i < num_elems; ++i) {
-    if (i) {
-      result << ", " << array[i];
-    } else {
-      result << array[i];
-    }
-  }
-  return result.str();
-}
-
 // Helper for logging SCTP messages.
 #if defined(__GNUC__)
 __attribute__((__format__(__printf__, 1, 2)))
@@ -472,43 +414,40 @@
                         << "Not adding data stream "
                         << "with sid=" << sid << " because sid is too high.";
     return false;
-  } else if (open_streams_.find(sid) != open_streams_.end()) {
+  }
+  auto it = stream_status_by_sid_.find(sid);
+  if (it == stream_status_by_sid_.end()) {
+    stream_status_by_sid_[sid] = StreamStatus();
+    return true;
+  }
+  if (it->second.is_open()) {
     RTC_LOG(LS_WARNING) << debug_name_ << "->OpenStream(...): "
                         << "Not adding data stream "
                         << "with sid=" << sid
                         << " because stream is already open.";
     return false;
-  } else if (queued_reset_streams_.find(sid) != queued_reset_streams_.end() ||
-             sent_reset_streams_.find(sid) != sent_reset_streams_.end()) {
+  } else {
     RTC_LOG(LS_WARNING) << debug_name_ << "->OpenStream(...): "
                         << "Not adding data stream "
                         << " with sid=" << sid
                         << " because stream is still closing.";
     return false;
   }
-
-  open_streams_.insert(sid);
-  return true;
 }
 
 bool SctpTransport::ResetStream(int sid) {
   RTC_DCHECK_RUN_ON(network_thread_);
-  StreamSet::iterator found = open_streams_.find(sid);
-  if (found == open_streams_.end()) {
-    RTC_LOG(LS_WARNING) << debug_name_ << "->ResetStream(" << sid << "): "
-                        << "stream not found.";
+
+  auto it = stream_status_by_sid_.find(sid);
+  if (it == stream_status_by_sid_.end() || !it->second.is_open()) {
+    RTC_LOG(LS_WARNING) << debug_name_ << "->ResetStream(" << sid
+                        << "): stream not open.";
     return false;
-  } else {
-    RTC_LOG(LS_VERBOSE) << debug_name_ << "->ResetStream(" << sid << "): "
-                        << "Removing and queuing RE-CONFIG chunk.";
-    open_streams_.erase(found);
   }
 
-  // SCTP won't let you have more than one stream reset pending at a time, but
-  // you can close multiple streams in a single reset.  So, we keep an internal
-  // queue of streams-to-reset, and send them as one reset message in
-  // SendQueuedStreamResets().
-  queued_reset_streams_.insert(sid);
+  RTC_LOG(LS_VERBOSE) << debug_name_ << "->ResetStream(" << sid << "): "
+                      << "Queuing RE-CONFIG chunk.";
+  it->second.closure_initiated = true;
 
   // Signal our stream-reset logic that it should try to send now, if it can.
   SendQueuedStreamResets();
@@ -534,12 +473,15 @@
     return false;
   }
 
-  if (params.type != DMT_CONTROL &&
-      open_streams_.find(params.sid) == open_streams_.end()) {
-    RTC_LOG(LS_WARNING) << debug_name_ << "->SendData(...): "
-                        << "Not sending data because sid is unknown: "
-                        << params.sid;
-    return false;
+  if (params.type != DMT_CONTROL) {
+    auto it = stream_status_by_sid_.find(params.sid);
+    if (it == stream_status_by_sid_.end() || !it->second.is_open()) {
+      RTC_LOG(LS_WARNING)
+          << debug_name_ << "->SendData(...): "
+          << "Not sending data because sid is unknown or closing: "
+          << params.sid;
+      return false;
+    }
   }
 
   // Send data using SCTP.
@@ -790,46 +732,63 @@
 
 bool SctpTransport::SendQueuedStreamResets() {
   RTC_DCHECK_RUN_ON(network_thread_);
-  if (!sent_reset_streams_.empty() || queued_reset_streams_.empty()) {
+
+  // Figure out how many streams need to be reset. We need to do this so we can
+  // allocate the right amount of memory for the sctp_reset_streams structure.
+  size_t num_streams = std::count_if(
+      stream_status_by_sid_.begin(), stream_status_by_sid_.end(),
+      [](const std::map<uint32_t, StreamStatus>::value_type& stream) {
+        return stream.second.need_outgoing_reset();
+      });
+  if (num_streams == 0) {
+    // Nothing to reset.
     return true;
   }
 
   RTC_LOG(LS_VERBOSE) << "SendQueuedStreamResets[" << debug_name_
-                      << "]: Sending [" << ListStreams(queued_reset_streams_)
-                      << "], Open: [" << ListStreams(open_streams_)
-                      << "], Sent: [" << ListStreams(sent_reset_streams_)
-                      << "]";
+                      << "]: Resetting " << num_streams << " outgoing streams.";
 
-  const size_t num_streams = queued_reset_streams_.size();
   const size_t num_bytes =
       sizeof(struct sctp_reset_streams) + (num_streams * sizeof(uint16_t));
-
   std::vector<uint8_t> reset_stream_buf(num_bytes, 0);
   struct sctp_reset_streams* resetp =
       reinterpret_cast<sctp_reset_streams*>(&reset_stream_buf[0]);
   resetp->srs_assoc_id = SCTP_ALL_ASSOC;
-  resetp->srs_flags = SCTP_STREAM_RESET_INCOMING | SCTP_STREAM_RESET_OUTGOING;
+  resetp->srs_flags = SCTP_STREAM_RESET_OUTGOING;
   resetp->srs_number_streams = rtc::checked_cast<uint16_t>(num_streams);
   int result_idx = 0;
-  for (StreamSet::iterator it = queued_reset_streams_.begin();
-       it != queued_reset_streams_.end(); ++it) {
-    resetp->srs_stream_list[result_idx++] = *it;
+
+  for (const std::map<uint32_t, StreamStatus>::value_type& stream :
+       stream_status_by_sid_) {
+    if (!stream.second.need_outgoing_reset()) {
+      continue;
+    }
+    resetp->srs_stream_list[result_idx++] = stream.first;
   }
 
   int ret =
       usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_RESET_STREAMS, resetp,
                          rtc::checked_cast<socklen_t>(reset_stream_buf.size()));
   if (ret < 0) {
-    RTC_LOG_ERRNO(LS_ERROR) << debug_name_
-                            << "->SendQueuedStreamResets(): "
-                               "Failed to send a stream reset for "
-                            << num_streams << " streams";
+    // Note that usrsctp only lets us have one reset in progress at a time
+    // (even though multiple streams can be reset at once). If this happens,
+    // SendQueuedStreamResets will end up called after the current in-progress
+    // reset finishes, in OnStreamResetEvent.
+    RTC_LOG_ERRNO(LS_WARNING) << debug_name_
+                              << "->SendQueuedStreamResets(): "
+                                 "Failed to send a stream reset for "
+                              << num_streams << " streams";
     return false;
   }
 
-  // sent_reset_streams_ is empty, and all the queued_reset_streams_ go into
-  // it now.
-  queued_reset_streams_.swap(sent_reset_streams_);
+  // Since the usrsctp call completed successfully, update our stream status
+  // map to note that we started the outgoing reset.
+  for (auto it = stream_status_by_sid_.begin();
+       it != stream_status_by_sid_.end(); ++it) {
+    if (it->second.need_outgoing_reset()) {
+      it->second.outgoing_reset_initiated = true;
+    }
+  }
   return true;
 }
 
@@ -1049,78 +1008,73 @@
 void SctpTransport::OnStreamResetEvent(
     const struct sctp_stream_reset_event* evt) {
   RTC_DCHECK_RUN_ON(network_thread_);
-  // A stream reset always involves two RE-CONFIG chunks for us -- we always
-  // simultaneously reset a sid's sequence number in both directions.  The
-  // requesting side transmits a RE-CONFIG chunk and waits for the peer to send
-  // one back.  Both sides get this SCTP_STREAM_RESET_EVENT when they receive
-  // RE-CONFIGs.
+
+  // This callback indicates that a reset is complete for incoming and/or
+  // outgoing streams. The reset may have been initiated by us or the remote
+  // side.
   const int num_sids = (evt->strreset_length - sizeof(*evt)) /
                        sizeof(evt->strreset_stream_list[0]);
-  RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
-                      << "): Flags = 0x" << rtc::ToHex(evt->strreset_flags)
-                      << " (" << ListFlags(evt->strreset_flags) << ")";
-  RTC_LOG(LS_VERBOSE) << "Assoc = " << evt->strreset_assoc_id << ", Streams = ["
-                      << ListArray(evt->strreset_stream_list, num_sids)
-                      << "], Open: [" << ListStreams(open_streams_)
-                      << "], Q'd: [" << ListStreams(queued_reset_streams_)
-                      << "], Sent: [" << ListStreams(sent_reset_streams_)
-                      << "]";
 
-  // If both sides try to reset some streams at the same time (even if they're
-  // disjoint sets), we can get reset failures.
   if (evt->strreset_flags & SCTP_STREAM_RESET_FAILED) {
-    // OK, just try again.  The stream IDs sent over when the RESET_FAILED flag
-    // is set seem to be garbage values.  Ignore them.
-    queued_reset_streams_.insert(sent_reset_streams_.begin(),
-                                 sent_reset_streams_.end());
-    sent_reset_streams_.clear();
+    // OK, just try sending any previously sent stream resets again. The stream
+    // IDs sent over when the RESET_FIALED flag is set seem to be garbage
+    // values. Ignore them.
+    for (std::map<uint32_t, StreamStatus>::value_type& stream :
+         stream_status_by_sid_) {
+      stream.second.outgoing_reset_initiated = false;
+    }
+    SendQueuedStreamResets();
+    // TODO(deadbeef): If this happens, the entire SCTP association is in quite
+    // crippled state. The SCTP session should be dismantled, and the WebRTC
+    // connectivity errored because is clear that the distant party is not
+    // playing ball: malforms the transported data.
+    return;
+  }
 
-  } else if (evt->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
-    // Each side gets an event for each direction of a stream.  That is,
-    // closing sid k will make each side receive INCOMING and OUTGOING reset
-    // events for k.  As per RFC6525, Section 5, paragraph 2, each side will
-    // get an INCOMING event first.
-    for (int i = 0; i < num_sids; i++) {
-      const int stream_id = evt->strreset_stream_list[i];
+  // Loop over the received events and properly update the StreamStatus map.
+  for (int i = 0; i < num_sids; i++) {
+    const uint32_t sid = evt->strreset_stream_list[i];
+    auto it = stream_status_by_sid_.find(sid);
+    if (it == stream_status_by_sid_.end()) {
+      // This stream is unknown. Sometimes this can be from a
+      // RESET_FAILED-related retransmit.
+      RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
+                          << "): Unknown sid " << sid;
+      continue;
+    }
+    StreamStatus& status = it->second;
 
-      // See if this stream ID was closed by our peer or ourselves.
-      StreamSet::iterator it = sent_reset_streams_.find(stream_id);
-
-      // The reset was requested locally.
-      if (it != sent_reset_streams_.end()) {
-        RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
-                            << "): local sid " << stream_id << " acknowledged.";
-        sent_reset_streams_.erase(it);
-
-      } else if ((it = open_streams_.find(stream_id)) != open_streams_.end()) {
-        // The peer requested the reset.
-        RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
-                            << "): closing sid " << stream_id;
-        open_streams_.erase(it);
-        SignalStreamClosedRemotely(stream_id);
-
-      } else if ((it = queued_reset_streams_.find(stream_id)) !=
-                 queued_reset_streams_.end()) {
-        // The peer requested the reset, but there was a local reset
-        // queued.
-        RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
-                            << "): double-sided close for sid " << stream_id;
-        // Both sides want the stream closed, and the peer got to send the
-        // RE-CONFIG first.  Treat it like the local Remove(Send|Recv)Stream
-        // finished quickly.
-        queued_reset_streams_.erase(it);
-
-      } else {
-        // This stream is unknown.  Sometimes this can be from an
-        // RESET_FAILED-related retransmit.
-        RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_EVENT(" << debug_name_
-                            << "): Unknown sid " << stream_id;
+    if (evt->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
+      RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_INCOMING_SSN(" << debug_name_
+                          << "): sid " << sid;
+      status.incoming_reset_complete = true;
+      // If we receive an incoming stream reset and we haven't started the
+      // closing procedure ourselves, this means the remote side started the
+      // closing procedure; fire a signal so that the relevant data channel
+      // can change to "closing" (we still need to reset the outgoing stream
+      // before it changes to "closed").
+      if (!status.closure_initiated) {
+        SignalClosingProcedureStartedRemotely(sid);
       }
     }
+    if (evt->strreset_flags & SCTP_STREAM_RESET_OUTGOING_SSN) {
+      RTC_LOG(LS_VERBOSE) << "SCTP_STREAM_RESET_OUTGOING_SSN(" << debug_name_
+                          << "): sid " << sid;
+      status.outgoing_reset_complete = true;
+    }
+
+    // If this reset completes the closing procedure, remove the stream from
+    // our map so we can consider it closed, and fire a signal such that the
+    // relevant DataChannel will change its state to "closed" and its ID can be
+    // re-used.
+    if (status.reset_complete()) {
+      stream_status_by_sid_.erase(it);
+      SignalClosingProcedureComplete(sid);
+    }
   }
 
-  // Always try to send the queued RESET because this call indicates that the
-  // last local RESET or remote RESET has made some progress.
+  // Always try to send any queued resets because this call indicates that the
+  // last outgoing or incoming reset has made some progress.
   SendQueuedStreamResets();
 }
 
diff --git a/media/sctp/sctptransport.h b/media/sctp/sctptransport.h
index 45132e2..b2d7084 100644
--- a/media/sctp/sctptransport.h
+++ b/media/sctp/sctptransport.h
@@ -13,6 +13,7 @@
 
 #include <errno.h>
 
+#include <map>
 #include <memory>  // for unique_ptr.
 #include <set>
 #include <string>
@@ -152,17 +153,48 @@
   // congestion control)? Different than |transport_|'s "ready to send".
   bool ready_to_send_data_ = false;
 
-  typedef std::set<uint32_t> StreamSet;
-  // When a data channel opens a stream, it goes into open_streams_.  When we
-  // want to close it, the stream's ID goes into queued_reset_streams_.  When
-  // we actually transmit a RE-CONFIG chunk with that stream ID, the ID goes
-  // into sent_reset_streams_.  When we get a response RE-CONFIG chunk back
-  // acknowledging the reset, we remove the stream ID from
-  // sent_reset_streams_.  We use sent_reset_streams_ to differentiate
-  // between acknowledgment RE-CONFIG and peer-initiated RE-CONFIGs.
-  StreamSet open_streams_;
-  StreamSet queued_reset_streams_;
-  StreamSet sent_reset_streams_;
+  // Used to keep track of the status of each stream (or rather, each pair of
+  // incoming/outgoing streams with matching IDs). It's specifically used to
+  // keep track of the status of resets, but more information could be put here
+  // later.
+  //
+  // See datachannel.h for a summary of the closing procedure.
+  struct StreamStatus {
+    // Closure initiated by application via ResetStream? Note that
+    // this may be true while outgoing_reset_initiated is false if the outgoing
+    // reset needed to be queued.
+    bool closure_initiated = false;
+    // Whether we've initiated the outgoing stream reset via
+    // SCTP_RESET_STREAMS.
+    bool outgoing_reset_initiated = false;
+    // Whether usrsctp has indicated that the incoming/outgoing streams have
+    // been reset. It's expected that the peer will reset its outgoing stream
+    // (our incoming stream) after receiving the reset for our outgoing stream,
+    // though older versions of chromium won't do this. See crbug.com/559394
+    // for context.
+    bool outgoing_reset_complete = false;
+    bool incoming_reset_complete = false;
+
+    // Some helper methods to improve code readability.
+    bool is_open() const {
+      return !closure_initiated && !incoming_reset_complete &&
+             !outgoing_reset_complete;
+    }
+    // We need to send an outgoing reset if the application has closed the data
+    // channel, or if we received a reset of the incoming stream from the
+    // remote endpoint, indicating the data channel was closed remotely.
+    bool need_outgoing_reset() const {
+      return (incoming_reset_complete || closure_initiated) &&
+             !outgoing_reset_initiated;
+    }
+    bool reset_complete() const {
+      return outgoing_reset_complete && incoming_reset_complete;
+    }
+  };
+
+  // Entries should only be removed from this map if |reset_complete| is
+  // true.
+  std::map<uint32_t, StreamStatus> stream_status_by_sid_;
 
   // A static human-readable name for debugging messages.
   const char* debug_name_ = "SctpTransport";
diff --git a/media/sctp/sctptransport_unittest.cc b/media/sctp/sctptransport_unittest.cc
index 9864d7a..86f4553 100644
--- a/media/sctp/sctptransport_unittest.cc
+++ b/media/sctp/sctptransport_unittest.cc
@@ -67,56 +67,54 @@
   ReceiveDataParams last_params_;
 };
 
-class SignalReadyToSendObserver : public sigslot::has_slots<> {
+class SctpTransportObserver : public sigslot::has_slots<> {
  public:
-  SignalReadyToSendObserver() : signaled_(false) {}
-
-  void OnSignaled() { signaled_ = true; }
-
-  bool IsSignaled() { return signaled_; }
-
- private:
-  bool signaled_;
-};
-
-class SignalTransportClosedObserver : public sigslot::has_slots<> {
- public:
-  SignalTransportClosedObserver() {}
-  void BindSelf(SctpTransport* transport) {
-    transport->SignalStreamClosedRemotely.connect(
-        this, &SignalTransportClosedObserver::OnStreamClosed);
+  explicit SctpTransportObserver(SctpTransport* transport) {
+    transport->SignalClosingProcedureComplete.connect(
+        this, &SctpTransportObserver::OnClosingProcedureComplete);
+    transport->SignalReadyToSendData.connect(
+        this, &SctpTransportObserver::OnReadyToSend);
   }
-  void OnStreamClosed(int stream) { streams_.push_back(stream); }
 
   int StreamCloseCount(int stream) {
-    return std::count(streams_.begin(), streams_.end(), stream);
+    return std::count(closed_streams_.begin(), closed_streams_.end(), stream);
   }
 
   bool WasStreamClosed(int stream) {
-    return std::find(streams_.begin(), streams_.end(), stream) !=
-           streams_.end();
+    return std::find(closed_streams_.begin(), closed_streams_.end(), stream) !=
+           closed_streams_.end();
   }
 
+  bool ReadyToSend() { return ready_to_send_; }
+
  private:
-  std::vector<int> streams_;
+  void OnClosingProcedureComplete(int stream) {
+    closed_streams_.push_back(stream);
+  }
+  void OnReadyToSend() { ready_to_send_ = true; }
+
+  std::vector<int> closed_streams_;
+  bool ready_to_send_ = false;
 };
 
+// Helper class used to immediately attempt to reopen a stream as soon as it's
+// been closed.
 class SignalTransportClosedReopener : public sigslot::has_slots<> {
  public:
   SignalTransportClosedReopener(SctpTransport* transport, SctpTransport* peer)
       : transport_(transport), peer_(peer) {}
 
+  int StreamCloseCount(int stream) {
+    return std::count(streams_.begin(), streams_.end(), stream);
+  }
+
+ private:
   void OnStreamClosed(int stream) {
     transport_->OpenStream(stream);
     peer_->OpenStream(stream);
     streams_.push_back(stream);
   }
 
-  int StreamCloseCount(int stream) {
-    return std::count(streams_.begin(), streams_.end(), stream);
-  }
-
- private:
   SctpTransport* transport_;
   SctpTransport* peer_;
   std::vector<int> streams_;
@@ -356,14 +354,11 @@
   FakeDtlsTransport fake_dtls("fake dtls", 0);
   SctpFakeDataReceiver recv;
   std::unique_ptr<SctpTransport> transport(CreateTransport(&fake_dtls, &recv));
-
-  SignalReadyToSendObserver signal_observer;
-  transport->SignalReadyToSendData.connect(
-      &signal_observer, &SignalReadyToSendObserver::OnSignaled);
+  SctpTransportObserver observer(transport.get());
 
   transport->Start(kSctpDefaultPort, kSctpDefaultPort);
   fake_dtls.SetWritable(true);
-  EXPECT_TRUE_WAIT(signal_observer.IsSignaled(), kDefaultTimeout);
+  EXPECT_TRUE_WAIT(observer.ReadyToSend(), kDefaultTimeout);
 }
 
 // Test that after an SCTP socket's buffer is filled, SignalReadyToSendData
@@ -476,10 +471,8 @@
 
 TEST_F(SctpTransportTest, ClosesRemoteStream) {
   SetupConnectedTransportsWithTwoStreams();
-  SignalTransportClosedObserver transport1_sig_receiver,
-      transport2_sig_receiver;
-  transport1_sig_receiver.BindSelf(transport1());
-  transport2_sig_receiver.BindSelf(transport2());
+  SctpTransportObserver transport1_observer(transport1());
+  SctpTransportObserver transport2_observer(transport2());
 
   SendDataResult result;
   ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result));
@@ -490,18 +483,16 @@
   EXPECT_TRUE_WAIT(ReceivedData(receiver1(), 2, "hi transport1"),
                    kDefaultTimeout);
 
-  // Close transport 1.  Transport 2 should notify us.
+  // Close stream 1 on transport 1. Transport 2 should notify us.
   transport1()->ResetStream(1);
-  EXPECT_TRUE_WAIT(transport2_sig_receiver.WasStreamClosed(1), kDefaultTimeout);
+  EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout);
 }
 
 TEST_F(SctpTransportTest, ClosesTwoRemoteStreams) {
   SetupConnectedTransportsWithTwoStreams();
   AddStream(3);
-  SignalTransportClosedObserver transport1_sig_receiver,
-      transport2_sig_receiver;
-  transport1_sig_receiver.BindSelf(transport1());
-  transport2_sig_receiver.BindSelf(transport2());
+  SctpTransportObserver transport1_observer(transport1());
+  SctpTransportObserver transport2_observer(transport2());
 
   SendDataResult result;
   ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result));
@@ -515,18 +506,16 @@
   // Close two streams on one side.
   transport2()->ResetStream(2);
   transport2()->ResetStream(3);
-  EXPECT_TRUE_WAIT(transport1_sig_receiver.WasStreamClosed(2), kDefaultTimeout);
-  EXPECT_TRUE_WAIT(transport1_sig_receiver.WasStreamClosed(3), kDefaultTimeout);
+  EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(2), kDefaultTimeout);
+  EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(3), kDefaultTimeout);
 }
 
 TEST_F(SctpTransportTest, ClosesStreamsOnBothSides) {
   SetupConnectedTransportsWithTwoStreams();
   AddStream(3);
   AddStream(4);
-  SignalTransportClosedObserver transport1_sig_receiver,
-      transport2_sig_receiver;
-  transport1_sig_receiver.BindSelf(transport1());
-  transport2_sig_receiver.BindSelf(transport2());
+  SctpTransportObserver transport1_observer(transport1());
+  SctpTransportObserver transport2_observer(transport2());
 
   SendDataResult result;
   ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result));
@@ -545,10 +534,10 @@
   transport2()->ResetStream(2);
   transport2()->ResetStream(3);
   transport2()->ResetStream(4);
-  EXPECT_TRUE_WAIT(transport2_sig_receiver.WasStreamClosed(1), kDefaultTimeout);
-  EXPECT_TRUE_WAIT(transport1_sig_receiver.WasStreamClosed(2), kDefaultTimeout);
-  EXPECT_TRUE_WAIT(transport1_sig_receiver.WasStreamClosed(3), kDefaultTimeout);
-  EXPECT_TRUE_WAIT(transport1_sig_receiver.WasStreamClosed(4), kDefaultTimeout);
+  EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout);
+  EXPECT_TRUE_WAIT(transport1_observer.WasStreamClosed(2), kDefaultTimeout);
+  EXPECT_TRUE_WAIT(transport1_observer.WasStreamClosed(3), kDefaultTimeout);
+  EXPECT_TRUE_WAIT(transport1_observer.WasStreamClosed(4), kDefaultTimeout);
 }
 
 TEST_F(SctpTransportTest, RefusesHighNumberedTransports) {
@@ -557,20 +546,18 @@
   EXPECT_FALSE(AddStream(kMaxSctpSid + 1));
 }
 
-// Flaky, see webrtc:4453.
-TEST_F(SctpTransportTest, DISABLED_ReusesAStream) {
+TEST_F(SctpTransportTest, ReusesAStream) {
   // Shut down transport 1, then open it up again for reuse.
   SetupConnectedTransportsWithTwoStreams();
   SendDataResult result;
-  SignalTransportClosedObserver transport2_sig_receiver;
-  transport2_sig_receiver.BindSelf(transport2());
+  SctpTransportObserver transport2_observer(transport2());
 
   ASSERT_TRUE(SendData(transport1(), 1, "hello?", &result));
   EXPECT_EQ(SDR_SUCCESS, result);
   EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hello?"), kDefaultTimeout);
 
   transport1()->ResetStream(1);
-  EXPECT_TRUE_WAIT(transport2_sig_receiver.WasStreamClosed(1), kDefaultTimeout);
+  EXPECT_TRUE_WAIT(transport2_observer.WasStreamClosed(1), kDefaultTimeout);
   // Transport 1 is gone now.
 
   // Create a new transport 1.
@@ -579,8 +566,7 @@
   EXPECT_EQ(SDR_SUCCESS, result);
   EXPECT_TRUE_WAIT(ReceivedData(receiver2(), 1, "hi?"), kDefaultTimeout);
   transport1()->ResetStream(1);
-  EXPECT_TRUE_WAIT(transport2_sig_receiver.StreamCloseCount(1) == 2,
-                   kDefaultTimeout);
+  EXPECT_EQ_WAIT(2, transport2_observer.StreamCloseCount(1), kDefaultTimeout);
 }
 
 }  // namespace cricket
diff --git a/media/sctp/sctptransportinternal.h b/media/sctp/sctptransportinternal.h
index 9168320..0380a87 100644
--- a/media/sctp/sctptransportinternal.h
+++ b/media/sctp/sctptransportinternal.h
@@ -85,10 +85,10 @@
   // used" part. See:
   // https://bugs.chromium.org/p/chromium/issues/detail?id=619849
   virtual bool OpenStream(int sid) = 0;
-  // The inverse of OpenStream. When this method returns, the reset process may
-  // have not finished but it will have begun.
-  // TODO(deadbeef): We need a way to tell when it's done. See:
-  // https://bugs.chromium.org/p/webrtc/issues/detail?id=4453
+  // The inverse of OpenStream. Begins the closing procedure, which will
+  // eventually result in SignalClosingProcedureComplete on the side that
+  // initiates it, and both SignalClosingProcedureStartedRemotely and
+  // SignalClosingProcedureComplete on the other side.
   virtual bool ResetStream(int sid) = 0;
   // Send data down this channel (will be wrapped as SCTP packets then given to
   // usrsctp that will then post the network interface).
@@ -111,8 +111,14 @@
   // contains message payload.
   sigslot::signal2<const ReceiveDataParams&, const rtc::CopyOnWriteBuffer&>
       SignalDataReceived;
-  // Parameter is SID of closed stream.
-  sigslot::signal1<int> SignalStreamClosedRemotely;
+  // Parameter is SID; fired when we receive an incoming stream reset on an
+  // open stream, indicating that the other side started the closing procedure.
+  // After resetting the outgoing stream, SignalClosingProcedureComplete will
+  // fire too.
+  sigslot::signal1<int> SignalClosingProcedureStartedRemotely;
+  // Parameter is SID; fired when closing procedure is complete (both incoming
+  // and outgoing streams reset).
+  sigslot::signal1<int> SignalClosingProcedureComplete;
 
   // Helper for debugging.
   virtual void set_debug_name_for_testing(const char* debug_name) = 0;
diff --git a/pc/datachannel.cc b/pc/datachannel.cc
index 525b6f9..b7a717c 100644
--- a/pc/datachannel.cc
+++ b/pc/datachannel.cc
@@ -229,6 +229,7 @@
   send_ssrc_ = 0;
   send_ssrc_set_ = false;
   SetState(kClosing);
+  // Will send queued data before beginning the underlying closing procedure.
   UpdateState();
 }
 
@@ -251,7 +252,9 @@
     // blocked.
     RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP);
     if (!QueueSendDataMessage(buffer)) {
-      Close();
+      RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to queue "
+                           "additional data.";
+      CloseAbruptly();
     }
     return true;
   }
@@ -276,11 +279,6 @@
   UpdateState();
 }
 
-// The remote peer request that this channel shall be closed.
-void DataChannel::RemotePeerRequestClose() {
-  DoClose();
-}
-
 void DataChannel::SetSctpSid(int sid) {
   RTC_DCHECK_LT(config_.id, 0);
   RTC_DCHECK_GE(sid, 0);
@@ -293,6 +291,33 @@
   provider_->AddSctpDataStream(sid);
 }
 
+void DataChannel::OnClosingProcedureStartedRemotely(int sid) {
+  if (data_channel_type_ == cricket::DCT_SCTP && sid == config_.id &&
+      state_ != kClosing && state_ != kClosed) {
+    // Don't bother sending queued data since the side that initiated the
+    // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
+    // discussion about this.
+    queued_send_data_.Clear();
+    queued_control_data_.Clear();
+    // Just need to change state to kClosing, SctpTransport will handle the
+    // rest of the closing procedure and OnClosingProcedureComplete will be
+    // called later.
+    started_closing_procedure_ = true;
+    SetState(kClosing);
+  }
+}
+
+void DataChannel::OnClosingProcedureComplete(int sid) {
+  if (data_channel_type_ == cricket::DCT_SCTP && sid == config_.id) {
+    // If the closing procedure is complete, we should have finished sending
+    // all pending data and transitioned to kClosing already.
+    RTC_DCHECK_EQ(state_, kClosing);
+    RTC_DCHECK(queued_send_data_.Empty());
+    DisconnectFromProvider();
+    SetState(kClosed);
+  }
+}
+
 void DataChannel::OnTransportChannelCreated() {
   RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP);
   if (!connected_to_provider_) {
@@ -306,11 +331,15 @@
 }
 
 void DataChannel::OnTransportChannelDestroyed() {
-  // This method needs to synchronously close the data channel, which means any
-  // queued data needs to be discarded.
-  queued_send_data_.Clear();
-  queued_control_data_.Clear();
-  DoClose();
+  // The SctpTransport is going away (for example, because the SCTP m= section
+  // was rejected), so we need to close abruptly.
+  CloseAbruptly();
+}
+
+// The remote peer request that this channel shall be closed.
+void DataChannel::RemotePeerRequestClose() {
+  RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP);
+  CloseAbruptly();
 }
 
 void DataChannel::SetSendSsrc(uint32_t send_ssrc) {
@@ -387,7 +416,7 @@
 
       queued_received_data_.Clear();
       if (data_channel_type_ != cricket::DCT_RTP) {
-        Close();
+        CloseAbruptly();
       }
 
       return;
@@ -396,12 +425,6 @@
   }
 }
 
-void DataChannel::OnStreamClosedRemotely(int sid) {
-  if (data_channel_type_ == cricket::DCT_SCTP && sid == config_.id) {
-    Close();
-  }
-}
-
 void DataChannel::OnChannelReady(bool writable) {
   writable_ = writable;
   if (!writable) {
@@ -413,14 +436,23 @@
   UpdateState();
 }
 
-void DataChannel::DoClose() {
-  if (state_ == kClosed)
+void DataChannel::CloseAbruptly() {
+  if (state_ == kClosed) {
     return;
+  }
 
-  receive_ssrc_set_ = false;
-  send_ssrc_set_ = false;
+  if (connected_to_provider_) {
+    DisconnectFromProvider();
+  }
+
+  // Closing abruptly means any queued data gets thrown away.
+  queued_send_data_.Clear();
+  queued_control_data_.Clear();
+
+  // Still go to "kClosing" before "kClosed", since observers may be expecting
+  // that.
   SetState(kClosing);
-  UpdateState();
+  SetState(kClosed);
 }
 
 void DataChannel::UpdateState() {
@@ -460,13 +492,28 @@
       break;
     }
     case kClosing: {
+      // Wait for all queued data to be sent before beginning the closing
+      // procedure.
       if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
-        if (connected_to_provider_) {
-          DisconnectFromProvider();
-        }
-
-        if (!connected_to_provider_ && !send_ssrc_set_ && !receive_ssrc_set_) {
-          SetState(kClosed);
+        if (data_channel_type_ == cricket::DCT_RTP) {
+          // For RTP data channels, we can go to "closed" after we finish
+          // sending data and the send/recv SSRCs are unset.
+          if (connected_to_provider_) {
+            DisconnectFromProvider();
+          }
+          if (!send_ssrc_set_ && !receive_ssrc_set_) {
+            SetState(kClosed);
+          }
+        } else {
+          // For SCTP data channels, we need to wait for the closing procedure
+          // to complete; after calling RemoveSctpDataStream,
+          // OnClosingProcedureComplete will end up called asynchronously
+          // afterwards.
+          if (connected_to_provider_ && !started_closing_procedure_ &&
+              config_.id >= 0) {
+            started_closing_procedure_ = true;
+            provider_->RemoveSctpDataStream(config_.id);
+          }
         }
       }
       break;
@@ -498,10 +545,6 @@
 
   provider_->DisconnectDataChannel(this);
   connected_to_provider_ = false;
-
-  if (data_channel_type_ == cricket::DCT_SCTP && config_.id >= 0) {
-    provider_->RemoveSctpDataStream(config_.id);
-  }
 }
 
 void DataChannel::DeliverQueuedReceivedData() {
@@ -586,7 +629,7 @@
   RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
                        "send_result = "
                     << send_result;
-  Close();
+  CloseAbruptly();
 
   return false;
 }
@@ -653,7 +696,7 @@
     RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
                          " the CONTROL message, send_result = "
                       << send_result;
-    Close();
+    CloseAbruptly();
   }
   return retval;
 }
diff --git a/pc/datachannel.h b/pc/datachannel.h
index 1af90da..bd7ea1a 100644
--- a/pc/datachannel.h
+++ b/pc/datachannel.h
@@ -27,6 +27,9 @@
 
 class DataChannel;
 
+// TODO(deadbeef): Once RTP data channels go away, get rid of this and have
+// DataChannel depend on SctpTransportInternal (pure virtual SctpTransport
+// interface) instead.
 class DataChannelProviderInterface {
  public:
   // Sends the data to the transport.
@@ -39,7 +42,8 @@
   virtual void DisconnectDataChannel(DataChannel* data_channel) = 0;
   // Adds the data channel SID to the transport for SCTP.
   virtual void AddSctpDataStream(int sid) = 0;
-  // Removes the data channel SID from the transport for SCTP.
+  // Begins the closing procedure by sending an outgoing stream reset. Still
+  // need to wait for callbacks to tell when this completes.
   virtual void RemoveSctpDataStream(int sid) = 0;
   // Returns true if the transport channel is ready to send data.
   virtual bool ReadyToSendData() const = 0;
@@ -73,7 +77,7 @@
   // Gets the first unused odd/even id based on the DTLS role. If |role| is
   // SSL_CLIENT, the allocated id starts from 0 and takes even numbers;
   // otherwise, the id starts from 1 and takes odd numbers.
-  // Returns false if no id can be allocated.
+  // Returns false if no ID can be allocated.
   bool AllocateSid(rtc::SSLRole role, int* sid);
 
   // Attempts to reserve a specific sid. Returns false if it's unavailable.
@@ -92,7 +96,7 @@
 // DataChannel is a an implementation of the DataChannelInterface based on
 // libjingle's data engine. It provides an implementation of unreliable or
 // reliabledata channels. Currently this class is specifically designed to use
-// both RtpDataEngine and SctpDataEngine.
+// both RtpDataChannel and SctpTransport.
 
 // DataChannel states:
 // kConnecting: The channel has been created the transport might not yet be
@@ -104,6 +108,16 @@
 //           has been called with SSRC==0
 // kClosed: Both UpdateReceiveSsrc and UpdateSendSsrc has been called with
 //          SSRC==0.
+//
+// How the closing procedure works for SCTP:
+// 1. Alice calls Close(), state changes to kClosing.
+// 2. Alice finishes sending any queued data.
+// 3. Alice calls RemoveSctpDataStream, sends outgoing stream reset.
+// 4. Bob receives incoming stream reset; OnClosingProcedureStartedRemotely
+//    called.
+// 5. Bob sends outgoing stream reset. 6. Alice receives incoming reset,
+//    Bob receives acknowledgement. Both receive OnClosingProcedureComplete
+//    callback and transition to kClosed.
 class DataChannel : public DataChannelInterface,
                     public sigslot::has_slots<>,
                     public rtc::MessageHandler {
@@ -147,16 +161,21 @@
   // Slots for provider to connect signals to.
   void OnDataReceived(const cricket::ReceiveDataParams& params,
                       const rtc::CopyOnWriteBuffer& payload);
-  void OnStreamClosedRemotely(int sid);
 
-  // The remote peer request that this channel should be closed.
-  void RemotePeerRequestClose();
-
-  // The following methods are for SCTP only.
+  /********************************************
+   * The following methods are for SCTP only. *
+   ********************************************/
 
   // Sets the SCTP sid and adds to transport layer if not set yet. Should only
   // be called once.
   void SetSctpSid(int sid);
+  // The remote side started the closing procedure by resetting its outgoing
+  // stream (our incoming stream). Sets state to kClosing.
+  void OnClosingProcedureStartedRemotely(int sid);
+  // The closing procedure is complete; both incoming and outgoing stream
+  // resets are done and the channel can transition to kClosed. Called
+  // asynchronously after RemoveSctpDataStream.
+  void OnClosingProcedureComplete(int sid);
   // Called when the transport channel is created.
   // Only needs to be called for SCTP data channels.
   void OnTransportChannelCreated();
@@ -165,8 +184,12 @@
   // to kClosed.
   void OnTransportChannelDestroyed();
 
-  // The following methods are for RTP only.
+  /*******************************************
+   * The following methods are for RTP only. *
+   *******************************************/
 
+  // The remote peer requested that this channel should be closed.
+  void RemotePeerRequestClose();
   // Set the SSRC this channel should use to send data on the
   // underlying data engine. |send_ssrc| == 0 means that the channel is no
   // longer part of the session negotiation.
@@ -231,7 +254,11 @@
   };
 
   bool Init(const InternalDataChannelInit& config);
-  void DoClose();
+  // Close immediately, ignoring any queued data or closing procedure.
+  // This is called for RTP data channels when SDP indicates a channel should
+  // be removed, or SCTP data channels when the underlying SctpTransport is
+  // being destroyed.
+  void CloseAbruptly();
   void UpdateState();
   void SetState(DataState state);
   void DisconnectFromProvider();
@@ -261,6 +288,8 @@
   bool send_ssrc_set_;
   bool receive_ssrc_set_;
   bool writable_;
+  // Did we already start the graceful SCTP closing procedure?
+  bool started_closing_procedure_ = false;
   uint32_t send_ssrc_;
   uint32_t receive_ssrc_;
   // Control messages that always have to get sent out before any queued
diff --git a/pc/datachannel_unittest.cc b/pc/datachannel_unittest.cc
index f8e9880..4a713cb 100644
--- a/pc/datachannel_unittest.cc
+++ b/pc/datachannel_unittest.cc
@@ -66,6 +66,9 @@
   size_t on_buffered_amount_change_count_;
 };
 
+// TODO(deadbeef): The fact that these tests use a fake provider makes them not
+// too valuable. Should rewrite using the
+// peerconnection_datachannel_unittest.cc infrastructure.
 class SctpDataChannelTest : public testing::Test {
  protected:
   SctpDataChannelTest()
@@ -560,29 +563,6 @@
             webrtc_data_channel_->state());
 }
 
-// Tests that a already closed DataChannel does not fire onStateChange again.
-TEST_F(SctpDataChannelTest, ClosedDataChannelDoesNotFireOnStateChange) {
-  AddObserver();
-  webrtc_data_channel_->Close();
-  // OnStateChange called for kClosing and kClosed.
-  EXPECT_EQ(2U, observer_->on_state_change_count());
-
-  observer_->ResetOnStateChangeCount();
-  webrtc_data_channel_->RemotePeerRequestClose();
-  EXPECT_EQ(0U, observer_->on_state_change_count());
-}
-
-// Tests that RemotePeerRequestClose closes the local DataChannel.
-TEST_F(SctpDataChannelTest, RemotePeerRequestClose) {
-  AddObserver();
-  webrtc_data_channel_->RemotePeerRequestClose();
-
-  // OnStateChange called for kClosing and kClosed.
-  EXPECT_EQ(2U, observer_->on_state_change_count());
-  EXPECT_EQ(webrtc::DataChannelInterface::kClosed,
-            webrtc_data_channel_->state());
-}
-
 // Tests that the DataChannel is closed if the received buffer is full.
 TEST_F(SctpDataChannelTest, ClosedWhenReceivedBufferFull) {
   SetChannelReady();
diff --git a/pc/peerconnection.cc b/pc/peerconnection.cc
index 40aced4..0a984a7 100644
--- a/pc/peerconnection.cc
+++ b/pc/peerconnection.cc
@@ -4479,6 +4479,8 @@
        ++it) {
     if (it->get() == channel) {
       if (channel->id() >= 0) {
+        // After the closing procedure is done, it's safe to use this ID for
+        // another data channel.
         sid_allocator_.ReleaseSid(channel->id());
       }
       // Since this method is triggered by a signal from the DataChannel,
@@ -5062,8 +5064,10 @@
                                       &DataChannel::OnChannelReady);
     SignalSctpDataReceived.connect(webrtc_data_channel,
                                    &DataChannel::OnDataReceived);
-    SignalSctpStreamClosedRemotely.connect(
-        webrtc_data_channel, &DataChannel::OnStreamClosedRemotely);
+    SignalSctpClosingProcedureStartedRemotely.connect(
+        webrtc_data_channel, &DataChannel::OnClosingProcedureStartedRemotely);
+    SignalSctpClosingProcedureComplete.connect(
+        webrtc_data_channel, &DataChannel::OnClosingProcedureComplete);
   }
   return true;
 }
@@ -5081,7 +5085,8 @@
   } else {
     SignalSctpReadyToSendData.disconnect(webrtc_data_channel);
     SignalSctpDataReceived.disconnect(webrtc_data_channel);
-    SignalSctpStreamClosedRemotely.disconnect(webrtc_data_channel);
+    SignalSctpClosingProcedureStartedRemotely.disconnect(webrtc_data_channel);
+    SignalSctpClosingProcedureComplete.disconnect(webrtc_data_channel);
   }
 }
 
@@ -5601,8 +5606,14 @@
       this, &PeerConnection::OnSctpTransportReadyToSendData_n);
   sctp_transport_->SignalDataReceived.connect(
       this, &PeerConnection::OnSctpTransportDataReceived_n);
-  sctp_transport_->SignalStreamClosedRemotely.connect(
-      this, &PeerConnection::OnSctpStreamClosedRemotely_n);
+  // TODO(deadbeef): All we do here is AsyncInvoke to fire the signal on
+  // another thread. Would be nice if there was a helper class similar to
+  // sigslot::repeater that did this for us, eliminating a bunch of boilerplate
+  // code.
+  sctp_transport_->SignalClosingProcedureStartedRemotely.connect(
+      this, &PeerConnection::OnSctpClosingProcedureStartedRemotely_n);
+  sctp_transport_->SignalClosingProcedureComplete.connect(
+      this, &PeerConnection::OnSctpClosingProcedureComplete_n);
   sctp_mid_ = mid;
   sctp_transport_->SetDtlsTransport(dtls_transport);
   return true;
@@ -5674,13 +5685,22 @@
   }
 }
 
-void PeerConnection::OnSctpStreamClosedRemotely_n(int sid) {
+void PeerConnection::OnSctpClosingProcedureStartedRemotely_n(int sid) {
   RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP);
   RTC_DCHECK(network_thread()->IsCurrent());
   sctp_invoker_->AsyncInvoke<void>(
       RTC_FROM_HERE, signaling_thread(),
       rtc::Bind(&sigslot::signal1<int>::operator(),
-                &SignalSctpStreamClosedRemotely, sid));
+                &SignalSctpClosingProcedureStartedRemotely, sid));
+}
+
+void PeerConnection::OnSctpClosingProcedureComplete_n(int sid) {
+  RTC_DCHECK(data_channel_type_ == cricket::DCT_SCTP);
+  RTC_DCHECK(network_thread()->IsCurrent());
+  sctp_invoker_->AsyncInvoke<void>(
+      RTC_FROM_HERE, signaling_thread(),
+      rtc::Bind(&sigslot::signal1<int>::operator(),
+                &SignalSctpClosingProcedureComplete, sid));
 }
 
 // Returns false if bundle is enabled and rtcp_mux is disabled.
diff --git a/pc/peerconnection.h b/pc/peerconnection.h
index 8bcee7a..f753955 100644
--- a/pc/peerconnection.h
+++ b/pc/peerconnection.h
@@ -811,7 +811,8 @@
   // CONTROL messages on unused SIDs and processes them as OPEN messages.
   void OnSctpTransportDataReceived_s(const cricket::ReceiveDataParams& params,
                                      const rtc::CopyOnWriteBuffer& payload);
-  void OnSctpStreamClosedRemotely_n(int sid);
+  void OnSctpClosingProcedureStartedRemotely_n(int sid);
+  void OnSctpClosingProcedureComplete_n(int sid);
 
   bool ValidateBundleSettings(const cricket::SessionDescription* desc);
   bool HasRtcpMuxEnabled(const cricket::ContentInfo* content);
@@ -993,7 +994,8 @@
   sigslot::signal2<const cricket::ReceiveDataParams&,
                    const rtc::CopyOnWriteBuffer&>
       SignalSctpDataReceived;
-  sigslot::signal1<int> SignalSctpStreamClosedRemotely;
+  sigslot::signal1<int> SignalSctpClosingProcedureStartedRemotely;
+  sigslot::signal1<int> SignalSctpClosingProcedureComplete;
 
   std::unique_ptr<SessionDescriptionInterface> current_local_description_;
   std::unique_ptr<SessionDescriptionInterface> pending_local_description_;
diff --git a/pc/peerconnectionendtoend_unittest.cc b/pc/peerconnectionendtoend_unittest.cc
index c57032d..2f5631a 100644
--- a/pc/peerconnectionendtoend_unittest.cc
+++ b/pc/peerconnectionendtoend_unittest.cc
@@ -168,7 +168,7 @@
                                  size_t remote_dc_index) {
     EXPECT_EQ_WAIT(DataChannelInterface::kOpen, local_dc->state(), kMaxWait);
 
-    EXPECT_TRUE_WAIT(remote_dc_list.size() > remote_dc_index, kMaxWait);
+    ASSERT_TRUE_WAIT(remote_dc_list.size() > remote_dc_index, kMaxWait);
     EXPECT_EQ_WAIT(DataChannelInterface::kOpen,
                    remote_dc_list[remote_dc_index]->state(),
                    kMaxWait);
@@ -605,14 +605,10 @@
 
 // Verifies that a DataChannel added from an OPEN message functions after
 // a channel has been previously closed (webrtc issue 3778).
-// This previously failed because the new channel re-uses the ID of the closed
-// channel, and the closed channel was incorrectly still assigned to the id.
-// TODO(deadbeef): This is disabled because there's currently a race condition
-// caused by the fact that a data channel signals that it's closed before it
-// really is. Re-enable this test once that's fixed.
-// See: https://bugs.chromium.org/p/webrtc/issues/detail?id=4453
+// This previously failed because the new channel re-used the ID of the closed
+// channel, and the closed channel was incorrectly still assigned to the ID.
 TEST_P(PeerConnectionEndToEndTest,
-       DISABLED_DataChannelFromOpenWorksAfterClose) {
+       DataChannelFromOpenWorksAfterPreviousChannelClosed) {
   CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(),
             webrtc::MockAudioDecoderFactory::CreateEmptyFactory());
 
@@ -624,12 +620,49 @@
   WaitForConnection();
 
   WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 0);
-  CloseDataChannels(caller_dc, callee_signaled_data_channels_, 0);
+  int first_channel_id = caller_dc->id();
+  // Wait for the local side to say it's closed, but not the remote side.
+  // Previously, the channel on which Close is called reported being closed
+  // prematurely, and this caused issues; see bugs.webrtc.org/4453.
+  caller_dc->Close();
+  EXPECT_EQ_WAIT(DataChannelInterface::kClosed, caller_dc->state(), kMaxWait);
 
   // Create a new channel and ensure it works after closing the previous one.
   caller_dc = caller_->CreateDataChannel("data2", init);
-
   WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 1);
+  // Since the second channel was created after the first finished closing, it
+  // should be able to re-use the first one's ID.
+  EXPECT_EQ(first_channel_id, caller_dc->id());
+  TestDataChannelSendAndReceive(caller_dc, callee_signaled_data_channels_[1]);
+
+  CloseDataChannels(caller_dc, callee_signaled_data_channels_, 1);
+}
+
+// Similar to the above test, but don't wait for the first channel to finish
+// closing before creating the second one.
+TEST_P(PeerConnectionEndToEndTest,
+       DataChannelFromOpenWorksWhilePreviousChannelClosing) {
+  CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(),
+            webrtc::MockAudioDecoderFactory::CreateEmptyFactory());
+
+  webrtc::DataChannelInit init;
+  rtc::scoped_refptr<DataChannelInterface> caller_dc(
+      caller_->CreateDataChannel("data", init));
+
+  Negotiate();
+  WaitForConnection();
+
+  WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 0);
+  int first_channel_id = caller_dc->id();
+  caller_dc->Close();
+
+  // Immediately create a new channel, before waiting for the previous one to
+  // transition to "closed".
+  caller_dc = caller_->CreateDataChannel("data2", init);
+  WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 1);
+  // Since the second channel was created while the first was still closing,
+  // it should have been assigned a different ID.
+  EXPECT_NE(first_channel_id, caller_dc->id());
   TestDataChannelSendAndReceive(caller_dc, callee_signaled_data_channels_[1]);
 
   CloseDataChannels(caller_dc, callee_signaled_data_channels_, 1);
diff --git a/pc/test/fakedatachannelprovider.h b/pc/test/fakedatachannelprovider.h
index 9aa0271..7e68f5f 100644
--- a/pc/test/fakedatachannelprovider.h
+++ b/pc/test/fakedatachannelprovider.h
@@ -75,6 +75,14 @@
     RTC_CHECK(sid >= 0);
     send_ssrcs_.erase(sid);
     recv_ssrcs_.erase(sid);
+    // Unlike the real SCTP transport, act like the closing procedure finished
+    // instantly, doing the same snapshot thing as below.
+    for (webrtc::DataChannel* ch : std::set<webrtc::DataChannel*>(
+             connected_channels_.begin(), connected_channels_.end())) {
+      if (connected_channels_.count(ch)) {
+        ch->OnClosingProcedureComplete(sid);
+      }
+    }
   }
 
   bool ReadyToSendData() const override { return ready_to_send_; }
diff --git a/sdk/android/instrumentationtests/src/org/webrtc/PeerConnectionTest.java b/sdk/android/instrumentationtests/src/org/webrtc/PeerConnectionTest.java
index fa61b3d..7e3dee1 100644
--- a/sdk/android/instrumentationtests/src/org/webrtc/PeerConnectionTest.java
+++ b/sdk/android/instrumentationtests/src/org/webrtc/PeerConnectionTest.java
@@ -887,6 +887,8 @@
     answeringExpectations.expectStateChange(DataChannel.State.CLOSED);
     answeringExpectations.dataChannel.close();
     offeringExpectations.dataChannel.close();
+    assertTrue(offeringExpectations.waitForAllExpectationsToBeSatisfied(TIMEOUT_SECONDS));
+    assertTrue(answeringExpectations.waitForAllExpectationsToBeSatisfied(TIMEOUT_SECONDS));
 
     // Test SetBitrate.
     assertTrue(offeringPC.setBitrate(100000, 5000000, 500000000));
@@ -1048,6 +1050,8 @@
     answeringExpectations.expectStateChange(DataChannel.State.CLOSED);
     answeringExpectations.dataChannel.close();
     offeringExpectations.dataChannel.close();
+    assertTrue(offeringExpectations.waitForAllExpectationsToBeSatisfied(TIMEOUT_SECONDS));
+    assertTrue(answeringExpectations.waitForAllExpectationsToBeSatisfied(TIMEOUT_SECONDS));
 
     // Free the Java-land objects and collect them.
     shutdownPC(offeringPC, offeringExpectations);