dcsctp: Track open channels accurately
In rare cases, it is possible to queue a call to SendData from the signaling
thread on a channel being closed or already closed in the network thread.
By keeping track of currently open streams, we avoid sending messages
with a stream id of channels that the other side already considers closed
and has already reused for a new channel.
This caused rare messages to be delivered on the wrong data channel if
a message was quickly sent, channel closed and a new one reopened.
Bug: webrtc:14277
Change-Id: If35fed8d12d5d2c18cdc6601085d8b632c37a0ba
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272624
Commit-Queue: Florent Castelli <orphis@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37880}
diff --git a/media/sctp/dcsctp_transport.cc b/media/sctp/dcsctp_transport.cc
index c8e6da4..062360d 100644
--- a/media/sctp/dcsctp_transport.cc
+++ b/media/sctp/dcsctp_transport.cc
@@ -218,16 +218,17 @@
}
bool DcSctpTransport::OpenStream(int sid) {
+ RTC_DCHECK_RUN_ON(network_thread_);
RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ").";
- if (!socket_) {
- RTC_LOG(LS_ERROR) << debug_name_ << "->OpenStream(sid=" << sid
- << "): Transport is not started.";
- return false;
- }
+
+ StreamState stream_state;
+ stream_states_.insert_or_assign(dcsctp::StreamID(static_cast<uint16_t>(sid)),
+ stream_state);
return true;
}
bool DcSctpTransport::ResetStream(int sid) {
+ RTC_DCHECK_RUN_ON(network_thread_);
RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ").";
if (!socket_) {
RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
@@ -237,14 +238,21 @@
dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))};
- StreamClosingState& closing_state = closing_states_[streams[0]];
- if (closing_state.closure_initiated || closing_state.incoming_reset_done ||
- closing_state.outgoing_reset_done) {
+ auto it = stream_states_.find(streams[0]);
+ if (it == stream_states_.end()) {
+ RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
+ << "): Stream is not open.";
+ return false;
+ }
+
+ StreamState& stream_state = it->second;
+ if (stream_state.closure_initiated || stream_state.incoming_reset_done ||
+ stream_state.outgoing_reset_done) {
// The closing procedure was already initiated by the remote, don't do
// anything.
return false;
}
- closing_state.closure_initiated = true;
+ stream_state.closure_initiated = true;
socket_->ResetStreams(streams);
return true;
}
@@ -265,6 +273,30 @@
return false;
}
+ // It is possible for a message to be sent from the signaling thread at the
+ // same time a data-channel is closing, but before the signaling thread is
+ // aware of it. So we need to keep track of currently active data channels and
+ // skip sending messages for the ones that are not open or closing.
+ // The sending errors are not impacting the data channel API contract as
+ // it is allowed to discard queued messages when the channel is closing.
+ auto stream_state =
+ stream_states_.find(dcsctp::StreamID(static_cast<uint16_t>(sid)));
+ if (stream_state == stream_states_.end()) {
+ RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: "
+ << sid;
+ *result = cricket::SDR_ERROR;
+ return false;
+ }
+
+ if (stream_state->second.closure_initiated ||
+ stream_state->second.incoming_reset_done ||
+ stream_state->second.outgoing_reset_done) {
+ RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: "
+ << sid;
+ *result = cricket::SDR_ERROR;
+ return false;
+ }
+
auto max_message_size = socket_->options().max_message_size;
if (max_message_size > 0 && payload.size() > max_message_size) {
RTC_LOG(LS_WARNING) << debug_name_
@@ -519,16 +551,23 @@
RTC_LOG(LS_INFO) << debug_name_
<< "->OnStreamsResetPerformed(...): Outgoing stream reset"
<< ", sid=" << stream_id.value();
- StreamClosingState& closing_state = closing_states_[stream_id];
- closing_state.outgoing_reset_done = true;
- if (closing_state.incoming_reset_done) {
+ auto it = stream_states_.find(stream_id);
+ if (it == stream_states_.end()) {
+ // Ignoring an outgoing stream reset for a closed stream
+ return;
+ }
+
+ StreamState& stream_state = it->second;
+ stream_state.outgoing_reset_done = true;
+
+ if (stream_state.incoming_reset_done) {
// When the close was not initiated locally, we can signal the end of the
// data channel close procedure when the remote ACKs the reset.
if (data_channel_sink_) {
data_channel_sink_->OnChannelClosed(stream_id.value());
}
- closing_states_.erase(stream_id);
+ stream_states_.erase(stream_id);
}
}
}
@@ -540,10 +579,15 @@
RTC_LOG(LS_INFO) << debug_name_
<< "->OnIncomingStreamsReset(...): Incoming stream reset"
<< ", sid=" << stream_id.value();
- StreamClosingState& closing_state = closing_states_[stream_id];
- closing_state.incoming_reset_done = true;
- if (!closing_state.closure_initiated) {
+ auto it = stream_states_.find(stream_id);
+ if (it == stream_states_.end())
+ return;
+
+ StreamState& stream_state = it->second;
+ stream_state.incoming_reset_done = true;
+
+ if (!stream_state.closure_initiated) {
// When receiving an incoming stream reset event for a non local close
// procedure, the transport needs to reset the stream in the other
// direction too.
@@ -554,13 +598,13 @@
}
}
- if (closing_state.outgoing_reset_done) {
+ if (stream_state.outgoing_reset_done) {
// The close procedure that was initiated locally is complete when we
// receive and incoming reset event.
if (data_channel_sink_) {
data_channel_sink_->OnChannelClosed(stream_id.value());
}
- closing_states_.erase(stream_id);
+ stream_states_.erase(stream_id);
}
}
}