dcsctp: Track open channels accurately

In rare cases, it is possible to queue a call to SendData from the signaling
thread on a channel being closed or already closed in the network thread.
By keeping track of currently open streams, we avoid sending messages
with a stream id of channels that the other side already considers closed
and has already reused for a new channel.
This caused rare messages to be delivered on the wrong data channel if
a message was quickly sent, channel closed and a new one reopened.

Bug: webrtc:14277
Change-Id: If35fed8d12d5d2c18cdc6601085d8b632c37a0ba
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272624
Commit-Queue: Florent Castelli <orphis@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37880}
diff --git a/media/sctp/dcsctp_transport.cc b/media/sctp/dcsctp_transport.cc
index c8e6da4..062360d 100644
--- a/media/sctp/dcsctp_transport.cc
+++ b/media/sctp/dcsctp_transport.cc
@@ -218,16 +218,17 @@
 }
 
 bool DcSctpTransport::OpenStream(int sid) {
+  RTC_DCHECK_RUN_ON(network_thread_);
   RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ").";
-  if (!socket_) {
-    RTC_LOG(LS_ERROR) << debug_name_ << "->OpenStream(sid=" << sid
-                      << "): Transport is not started.";
-    return false;
-  }
+
+  StreamState stream_state;
+  stream_states_.insert_or_assign(dcsctp::StreamID(static_cast<uint16_t>(sid)),
+                                  stream_state);
   return true;
 }
 
 bool DcSctpTransport::ResetStream(int sid) {
+  RTC_DCHECK_RUN_ON(network_thread_);
   RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ").";
   if (!socket_) {
     RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
@@ -237,14 +238,21 @@
 
   dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))};
 
-  StreamClosingState& closing_state = closing_states_[streams[0]];
-  if (closing_state.closure_initiated || closing_state.incoming_reset_done ||
-      closing_state.outgoing_reset_done) {
+  auto it = stream_states_.find(streams[0]);
+  if (it == stream_states_.end()) {
+    RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
+                      << "): Stream is not open.";
+    return false;
+  }
+
+  StreamState& stream_state = it->second;
+  if (stream_state.closure_initiated || stream_state.incoming_reset_done ||
+      stream_state.outgoing_reset_done) {
     // The closing procedure was already initiated by the remote, don't do
     // anything.
     return false;
   }
-  closing_state.closure_initiated = true;
+  stream_state.closure_initiated = true;
   socket_->ResetStreams(streams);
   return true;
 }
@@ -265,6 +273,30 @@
     return false;
   }
 
+  // It is possible for a message to be sent from the signaling thread at the
+  // same time a data-channel is closing, but before the signaling thread is
+  // aware of it. So we need to keep track of currently active data channels and
+  // skip sending messages for the ones that are not open or closing.
+  // The sending errors are not impacting the data channel API contract as
+  // it is allowed to discard queued messages when the channel is closing.
+  auto stream_state =
+      stream_states_.find(dcsctp::StreamID(static_cast<uint16_t>(sid)));
+  if (stream_state == stream_states_.end()) {
+    RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: "
+                        << sid;
+    *result = cricket::SDR_ERROR;
+    return false;
+  }
+
+  if (stream_state->second.closure_initiated ||
+      stream_state->second.incoming_reset_done ||
+      stream_state->second.outgoing_reset_done) {
+    RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: "
+                        << sid;
+    *result = cricket::SDR_ERROR;
+    return false;
+  }
+
   auto max_message_size = socket_->options().max_message_size;
   if (max_message_size > 0 && payload.size() > max_message_size) {
     RTC_LOG(LS_WARNING) << debug_name_
@@ -519,16 +551,23 @@
     RTC_LOG(LS_INFO) << debug_name_
                      << "->OnStreamsResetPerformed(...): Outgoing stream reset"
                      << ", sid=" << stream_id.value();
-    StreamClosingState& closing_state = closing_states_[stream_id];
-    closing_state.outgoing_reset_done = true;
 
-    if (closing_state.incoming_reset_done) {
+    auto it = stream_states_.find(stream_id);
+    if (it == stream_states_.end()) {
+      // Ignoring an outgoing stream reset for a closed stream
+      return;
+    }
+
+    StreamState& stream_state = it->second;
+    stream_state.outgoing_reset_done = true;
+
+    if (stream_state.incoming_reset_done) {
       //  When the close was not initiated locally, we can signal the end of the
       //  data channel close procedure when the remote ACKs the reset.
       if (data_channel_sink_) {
         data_channel_sink_->OnChannelClosed(stream_id.value());
       }
-      closing_states_.erase(stream_id);
+      stream_states_.erase(stream_id);
     }
   }
 }
@@ -540,10 +579,15 @@
     RTC_LOG(LS_INFO) << debug_name_
                      << "->OnIncomingStreamsReset(...): Incoming stream reset"
                      << ", sid=" << stream_id.value();
-    StreamClosingState& closing_state = closing_states_[stream_id];
-    closing_state.incoming_reset_done = true;
 
-    if (!closing_state.closure_initiated) {
+    auto it = stream_states_.find(stream_id);
+    if (it == stream_states_.end())
+      return;
+
+    StreamState& stream_state = it->second;
+    stream_state.incoming_reset_done = true;
+
+    if (!stream_state.closure_initiated) {
       // When receiving an incoming stream reset event for a non local close
       // procedure, the transport needs to reset the stream in the other
       // direction too.
@@ -554,13 +598,13 @@
       }
     }
 
-    if (closing_state.outgoing_reset_done) {
+    if (stream_state.outgoing_reset_done) {
       // The close procedure that was initiated locally is complete when we
       // receive and incoming reset event.
       if (data_channel_sink_) {
         data_channel_sink_->OnChannelClosed(stream_id.value());
       }
-      closing_states_.erase(stream_id);
+      stream_states_.erase(stream_id);
     }
   }
 }