dcsctp: Handle rapid closing of streams

When streams were to be reset, but there was already an ongoing
stream reset command in-flight, those streams wouldn't be properly
reset. When multiple streams were reset close to each other (within
an RTT), some streams would not have their SSNs reset, which resulted
in the stream resuming the SSN sequence. This could result in ordered
streams not delivering all messages as the receiver wouldn't deliver any
messages with SSN different from the expected SSN=0.

In WebRTC data channels, this would be triggered if multiple channels
were closed at roughly the same time, then re-opened, and continued
to be used in ordered mode. Unordered messages would still be delivered,
but the stream state could be wrong as the DATA_CHANNEL_ACK message is
sent ordered, and possibly not delivered.

There were unit tests for this, but not on the socket level using
real components, but just on the stream reset handler using mocks,
where this issue wasn't found. Also, those mocks didn't validate that
the correct parameters were provided, so that's fixed now.

The root cause was the PrepareResetStreams was only called if there
wasn't an ongoing stream reset operation in progress. One may try to
solve it by always calling PrepareResetStreams also when there is an
ongoing request, or to call it when the request has finished. One would
then realize that when the response of the outgoing stream request is
received, and CommitResetStreams is called, it would reset all paused
and (prepared) to-be-reset streams - not just the ones in the outgoing
stream request.

One cause of this was the lack of a single source of truth of the stream
states. The SendQueue kept track of which streams that were paused, but
the stream reset handler kept track of which streams that were
resetting. As that's error prone, this CL moves the source of truth
completely to the SendQueue and defining explicit stream pause states. A
stream can be in one of these possible states:

  * Not paused. This is the default for an active stream.
  * Pending to be paused. This is when it's about to be reset, but
    there is a message that has been partly sent, with fragments
    remaining to be sent before it can be paused.
  * Paused, with no partly sent message. In this state, it's ready to
    be reset.
  * Resetting. A stream transitions into this state when it has been
    paused and has been included in an outgoing stream reset request.
    When this request has been responded to, the stream can really be
    reset (SSN=0, MID=0).

This CL also improves logging, and adds socket tests to catch this
issue.

Bug: webrtc:13994, chromium:1320194
Change-Id: I883570d1f277bc01e52b1afad62d6be2aca930a2
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/261180
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36771}
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc
index b4bc9c4..914bea3 100644
--- a/net/dcsctp/socket/dcsctp_socket_test.cc
+++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -36,6 +36,7 @@
 #include "net/dcsctp/packet/error_cause/error_cause.h"
 #include "net/dcsctp/packet/error_cause/unrecognized_chunk_type_cause.h"
 #include "net/dcsctp/packet/parameter/heartbeat_info_parameter.h"
+#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
 #include "net/dcsctp/packet/parameter/parameter.h"
 #include "net/dcsctp/packet/sctp_packet.h"
 #include "net/dcsctp/packet/tlv_trait.h"
@@ -229,6 +230,44 @@
   return true;
 }
 
+MATCHER_P(HasReconfigWithStreams, streams_matcher, "") {
+  absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
+  if (!packet.has_value()) {
+    *result_listener << "data didn't parse as an SctpPacket";
+    return false;
+  }
+
+  if (packet->descriptors()[0].type != ReConfigChunk::kType) {
+    *result_listener << "the first chunk in the packet is not a data chunk";
+    return false;
+  }
+
+  absl::optional<ReConfigChunk> reconfig =
+      ReConfigChunk::Parse(packet->descriptors()[0].data);
+  if (!reconfig.has_value()) {
+    *result_listener << "The first chunk didn't parse as a data chunk";
+    return false;
+  }
+
+  const Parameters& parameters = reconfig->parameters();
+  if (parameters.descriptors().size() != 1 ||
+      parameters.descriptors()[0].type !=
+          OutgoingSSNResetRequestParameter::kType) {
+    *result_listener << "Expected the reconfig chunk to have an outgoing SSN "
+                        "reset request parameter";
+    return false;
+  }
+
+  absl::optional<OutgoingSSNResetRequestParameter> p =
+      OutgoingSSNResetRequestParameter::Parse(parameters.descriptors()[0].data);
+  testing::Matcher<rtc::ArrayView<const StreamID>> matcher = streams_matcher;
+  if (!matcher.MatchAndExplain(p->stream_ids(), result_listener)) {
+    return false;
+  }
+
+  return true;
+}
+
 TSN AddTo(TSN tsn, int delta) {
   return TSN(*tsn + delta);
 }
@@ -2232,5 +2271,76 @@
 
   ExchangeMessages(a, z);
 }
+
+TEST(DcSctpSocketTest, CloseStreamsWithPendingRequest) {
+  // Checks that stream reset requests are properly paused when they can't be
+  // immediately reset - i.e. when there is already an ongoing stream reset
+  // request (and there can only be a single one in-flight).
+  SocketUnderTest a("A");
+  SocketUnderTest z("Z");
+
+  EXPECT_CALL(z.cb, OnIncomingStreamsReset(ElementsAre(StreamID(1)))).Times(1);
+  EXPECT_CALL(z.cb, OnIncomingStreamsReset(
+                        UnorderedElementsAre(StreamID(2), StreamID(3))))
+      .Times(1);
+  EXPECT_CALL(a.cb, OnStreamsResetPerformed(ElementsAre(StreamID(1)))).Times(1);
+  EXPECT_CALL(a.cb, OnStreamsResetPerformed(
+                        UnorderedElementsAre(StreamID(2), StreamID(3))))
+      .Times(1);
+
+  ConnectSockets(a, z);
+
+  SendOptions send_options = {.unordered = IsUnordered(false)};
+
+  // Send a few ordered messages
+  a.socket.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), send_options);
+  a.socket.Send(DcSctpMessage(StreamID(2), PPID(53), {1, 2}), send_options);
+  a.socket.Send(DcSctpMessage(StreamID(3), PPID(53), {1, 2}), send_options);
+
+  ExchangeMessages(a, z);
+
+  // Receive these messages
+  absl::optional<DcSctpMessage> msg1 = z.cb.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg1.has_value());
+  EXPECT_EQ(msg1->stream_id(), StreamID(1));
+  absl::optional<DcSctpMessage> msg2 = z.cb.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg2.has_value());
+  EXPECT_EQ(msg2->stream_id(), StreamID(2));
+  absl::optional<DcSctpMessage> msg3 = z.cb.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg3.has_value());
+  EXPECT_EQ(msg3->stream_id(), StreamID(3));
+
+  // Reset the streams - not all at once.
+  a.socket.ResetStreams(std::vector<StreamID>({StreamID(1)}));
+
+  std::vector<uint8_t> packet = a.cb.ConsumeSentPacket();
+  EXPECT_THAT(packet, HasReconfigWithStreams(ElementsAre(StreamID(1))));
+  z.socket.ReceivePacket(std::move(packet));
+
+  // Sending more reset requests while this one is ongoing.
+
+  a.socket.ResetStreams(std::vector<StreamID>({StreamID(2)}));
+  a.socket.ResetStreams(std::vector<StreamID>({StreamID(3)}));
+
+  ExchangeMessages(a, z);
+
+  // Send a few more ordered messages
+  a.socket.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), send_options);
+  a.socket.Send(DcSctpMessage(StreamID(2), PPID(53), {1, 2}), send_options);
+  a.socket.Send(DcSctpMessage(StreamID(3), PPID(53), {1, 2}), send_options);
+
+  ExchangeMessages(a, z);
+
+  // Receive these messages
+  absl::optional<DcSctpMessage> msg4 = z.cb.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg4.has_value());
+  EXPECT_EQ(msg4->stream_id(), StreamID(1));
+  absl::optional<DcSctpMessage> msg5 = z.cb.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg5.has_value());
+  EXPECT_EQ(msg5->stream_id(), StreamID(2));
+  absl::optional<DcSctpMessage> msg6 = z.cb.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg6.has_value());
+  EXPECT_EQ(msg6->stream_id(), StreamID(3));
+}  // namespace
 }  // namespace
 }  // namespace dcsctp
diff --git a/net/dcsctp/socket/stream_reset_handler.cc b/net/dcsctp/socket/stream_reset_handler.cc
index 1c6ce09..2d66658 100644
--- a/net/dcsctp/socket/stream_reset_handler.cc
+++ b/net/dcsctp/socket/stream_reset_handler.cc
@@ -270,16 +270,13 @@
   // Only send stream resets if there are streams to reset, and no current
   // ongoing request (there can only be one at a time), and if the stream
   // can be reset.
-  if (streams_to_reset_.empty() || current_request_.has_value() ||
-      !retransmission_queue_->CanResetStreams()) {
+  if (current_request_.has_value() ||
+      !retransmission_queue_->HasStreamsReadyToBeReset()) {
     return absl::nullopt;
   }
 
-  std::vector<StreamID> streams_to_reset(streams_to_reset_.begin(),
-                                         streams_to_reset_.end());
   current_request_.emplace(TSN(*retransmission_queue_->next_tsn() - 1),
-                           std::move(streams_to_reset));
-  streams_to_reset_.clear();
+                           retransmission_queue_->GetStreamsReadyToBeReset());
   reconfig_timer_->set_duration(ctx_->current_rto());
   reconfig_timer_->Start();
   return MakeReconfigChunk();
@@ -310,18 +307,8 @@
 
 void StreamResetHandler::ResetStreams(
     rtc::ArrayView<const StreamID> outgoing_streams) {
-  // Enqueue streams to be reset - as this may be called multiple times
-  // while a request is already in progress (and there can only be one).
   for (StreamID stream_id : outgoing_streams) {
-    streams_to_reset_.insert(stream_id);
-  }
-  if (current_request_.has_value()) {
-    // Already an ongoing request - will need to wait for it to finish as
-    // there can only be one in-flight ReConfig chunk with requests at any
-    // time.
-  } else {
-    retransmission_queue_->PrepareResetStreams(std::vector<StreamID>(
-        streams_to_reset_.begin(), streams_to_reset_.end()));
+    retransmission_queue_->PrepareResetStream(stream_id);
   }
 }
 
@@ -345,7 +332,7 @@
 
 HandoverReadinessStatus StreamResetHandler::GetHandoverReadiness() const {
   HandoverReadinessStatus status;
-  if (!streams_to_reset_.empty()) {
+  if (retransmission_queue_->HasStreamsReadyToBeReset()) {
     status.Add(HandoverUnreadinessReason::kPendingStreamReset);
   }
   if (current_request_.has_value()) {
diff --git a/net/dcsctp/socket/stream_reset_handler.h b/net/dcsctp/socket/stream_reset_handler.h
index a691eb8..6e49665 100644
--- a/net/dcsctp/socket/stream_reset_handler.h
+++ b/net/dcsctp/socket/stream_reset_handler.h
@@ -216,10 +216,6 @@
   RetransmissionQueue* retransmission_queue_;
   const std::unique_ptr<Timer> reconfig_timer_;
 
-  // Outgoing streams that have been requested to be reset, but hasn't yet
-  // been included in an outgoing request.
-  webrtc::flat_set<StreamID> streams_to_reset_;
-
   // The next sequence number for outgoing stream requests.
   ReconfigRequestSN next_outgoing_req_seq_nbr_;
 
diff --git a/net/dcsctp/socket/stream_reset_handler_test.cc b/net/dcsctp/socket/stream_reset_handler_test.cc
index 6f6874f..1b7463e 100644
--- a/net/dcsctp/socket/stream_reset_handler_test.cc
+++ b/net/dcsctp/socket/stream_reset_handler_test.cc
@@ -343,10 +343,13 @@
 }
 
 TEST_F(StreamResetHandlerTest, SendOutgoingRequestDirectly) {
-  EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(42)));
   handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
 
-  EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+      .WillOnce(Return(std::vector<StreamID>({StreamID(42)})));
+
   absl::optional<ReConfigChunk> reconfig = handler_->MakeStreamResetRequest();
   ASSERT_TRUE(reconfig.has_value());
   ASSERT_HAS_VALUE_AND_ASSIGN(
@@ -360,13 +363,21 @@
 }
 
 TEST_F(StreamResetHandlerTest, ResetMultipleStreamsInOneRequest) {
-  EXPECT_CALL(producer_, PrepareResetStreams).Times(3);
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(40)));
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(41)));
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(42))).Times(2);
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(43)));
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(44)));
   handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
   handler_->ResetStreams(
       std::vector<StreamID>({StreamID(43), StreamID(44), StreamID(41)}));
   handler_->ResetStreams(std::vector<StreamID>({StreamID(42), StreamID(40)}));
 
-  EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+      .WillOnce(Return(
+          std::vector<StreamID>({StreamID(40), StreamID(41), StreamID(42),
+                                 StreamID(43), StreamID(44)})));
   absl::optional<ReConfigChunk> reconfig = handler_->MakeStreamResetRequest();
   ASSERT_TRUE(reconfig.has_value());
   ASSERT_HAS_VALUE_AND_ASSIGN(
@@ -382,10 +393,10 @@
 }
 
 TEST_F(StreamResetHandlerTest, SendOutgoingRequestDeferred) {
-  EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(42)));
   handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
 
-  EXPECT_CALL(producer_, CanResetStreams())
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset())
       .WillOnce(Return(false))
       .WillOnce(Return(false))
       .WillOnce(Return(true));
@@ -396,10 +407,12 @@
 }
 
 TEST_F(StreamResetHandlerTest, SendOutgoingResettingOnPositiveResponse) {
-  EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(42)));
   handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
 
-  EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+      .WillOnce(Return(std::vector<StreamID>({StreamID(42)})));
 
   absl::optional<ReConfigChunk> reconfig = handler_->MakeStreamResetRequest();
   ASSERT_TRUE(reconfig.has_value());
@@ -412,8 +425,8 @@
       req.request_sequence_number(), ResponseResult::kSuccessPerformed));
   ReConfigChunk response_reconfig(builder.Build());
 
-  EXPECT_CALL(producer_, CommitResetStreams()).Times(1);
-  EXPECT_CALL(producer_, RollbackResetStreams()).Times(0);
+  EXPECT_CALL(producer_, CommitResetStreams);
+  EXPECT_CALL(producer_, RollbackResetStreams).Times(0);
 
   // Processing a response shouldn't result in sending anything.
   EXPECT_CALL(callbacks_, OnError).Times(0);
@@ -422,10 +435,12 @@
 }
 
 TEST_F(StreamResetHandlerTest, SendOutgoingResetRollbackOnError) {
-  EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(42)));
   handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
 
-  EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+      .WillOnce(Return(std::vector<StreamID>({StreamID(42)})));
 
   absl::optional<ReConfigChunk> reconfig = handler_->MakeStreamResetRequest();
   ASSERT_TRUE(reconfig.has_value());
@@ -438,8 +453,8 @@
       req.request_sequence_number(), ResponseResult::kErrorBadSequenceNumber));
   ReConfigChunk response_reconfig(builder.Build());
 
-  EXPECT_CALL(producer_, CommitResetStreams()).Times(0);
-  EXPECT_CALL(producer_, RollbackResetStreams()).Times(1);
+  EXPECT_CALL(producer_, CommitResetStreams).Times(0);
+  EXPECT_CALL(producer_, RollbackResetStreams);
 
   // Only requests should result in sending responses.
   EXPECT_CALL(callbacks_, OnError).Times(0);
@@ -450,10 +465,12 @@
 TEST_F(StreamResetHandlerTest, SendOutgoingResetRetransmitOnInProgress) {
   static constexpr StreamID kStreamToReset = StreamID(42);
 
-  EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
+  EXPECT_CALL(producer_, PrepareResetStream(kStreamToReset));
   handler_->ResetStreams(std::vector<StreamID>({kStreamToReset}));
 
-  EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+      .WillOnce(Return(std::vector<StreamID>({kStreamToReset})));
 
   absl::optional<ReConfigChunk> reconfig1 = handler_->MakeStreamResetRequest();
   ASSERT_TRUE(reconfig1.has_value());
@@ -499,10 +516,13 @@
 }
 
 TEST_F(StreamResetHandlerTest, ResetWhileRequestIsSentWillQueue) {
-  EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(42)));
   handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
 
-  EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+      .WillOnce(Return(std::vector<StreamID>({StreamID(42)})));
+
   absl::optional<ReConfigChunk> reconfig1 = handler_->MakeStreamResetRequest();
   ASSERT_TRUE(reconfig1.has_value());
   ASSERT_HAS_VALUE_AND_ASSIGN(
@@ -514,6 +534,8 @@
   EXPECT_THAT(req1.stream_ids(), UnorderedElementsAre(StreamID(42)));
 
   // Streams reset while the request is in-flight will be queued.
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(41)));
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(43)));
   StreamID stream_ids[] = {StreamID(41), StreamID(43)};
   handler_->ResetStreams(stream_ids);
   EXPECT_EQ(handler_->MakeStreamResetRequest(), absl::nullopt);
@@ -532,7 +554,10 @@
   handler_->HandleReConfig(std::move(response_reconfig));
 
   // Response has been processed. A new request can be sent.
-  EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+      .WillOnce(Return(std::vector<StreamID>({StreamID(41), StreamID(43)})));
+
   absl::optional<ReConfigChunk> reconfig2 = handler_->MakeStreamResetRequest();
   ASSERT_TRUE(reconfig2.has_value());
   ASSERT_HAS_VALUE_AND_ASSIGN(
@@ -591,21 +616,31 @@
 
 TEST_F(StreamResetHandlerTest,
        HandoverIsAllowedOnlyWhenNoStreamIsBeingOrWillBeReset) {
-  EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(42)));
   handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
   EXPECT_EQ(
       handler_->GetHandoverReadiness(),
       HandoverReadinessStatus(HandoverUnreadinessReason::kPendingStreamReset));
 
-  EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset())
+      .WillOnce(Return(true))
+      .WillOnce(Return(false));
+  EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+      .WillOnce(Return(std::vector<StreamID>({StreamID(42)})));
+
   ASSERT_TRUE(handler_->MakeStreamResetRequest().has_value());
   EXPECT_EQ(handler_->GetHandoverReadiness(),
             HandoverReadinessStatus(
                 HandoverUnreadinessReason::kPendingStreamResetRequest));
 
   // Reset more streams while the request is in-flight.
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(41)));
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(43)));
   StreamID stream_ids[] = {StreamID(41), StreamID(43)};
   handler_->ResetStreams(stream_ids);
+
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
   EXPECT_EQ(handler_->GetHandoverReadiness(),
             HandoverReadinessStatus()
                 .Add(HandoverUnreadinessReason::kPendingStreamResetRequest)
@@ -618,12 +653,18 @@
                         .Add(ReconfigurationResponseParameter(
                             kMyInitialReqSn, ResponseResult::kSuccessPerformed))
                         .Build()));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
   EXPECT_EQ(
       handler_->GetHandoverReadiness(),
       HandoverReadinessStatus(HandoverUnreadinessReason::kPendingStreamReset));
 
   // Second request can be sent.
-  EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset())
+      .WillOnce(Return(true))
+      .WillOnce(Return(false));
+  EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+      .WillOnce(Return(std::vector<StreamID>({StreamID(41), StreamID(43)})));
+
   ASSERT_TRUE(handler_->MakeStreamResetRequest().has_value());
   EXPECT_EQ(handler_->GetHandoverReadiness(),
             HandoverReadinessStatus(
@@ -638,16 +679,21 @@
           .Build()));
 
   // Seconds response has been processed. No pending resets.
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(false));
+
   EXPECT_TRUE(handler_->GetHandoverReadiness().IsReady());
 }
 
 TEST_F(StreamResetHandlerTest, HandoverInInitialState) {
   PerformHandover();
 
-  EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
+  EXPECT_CALL(producer_, PrepareResetStream(StreamID(42)));
   handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
 
-  EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
+  EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+      .WillOnce(Return(std::vector<StreamID>({StreamID(42)})));
+
   absl::optional<ReConfigChunk> reconfig = handler_->MakeStreamResetRequest();
   ASSERT_TRUE(reconfig.has_value());
   ASSERT_HAS_VALUE_AND_ASSIGN(
@@ -663,10 +709,15 @@
 TEST_F(StreamResetHandlerTest, HandoverAfterHavingResetOneStream) {
   // Reset one stream
   {
-    EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
+    EXPECT_CALL(producer_, PrepareResetStream(StreamID(42)));
     handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
 
-    EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+    EXPECT_CALL(producer_, HasStreamsReadyToBeReset())
+        .WillOnce(Return(true))
+        .WillOnce(Return(false));
+    EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+        .WillOnce(Return(std::vector<StreamID>({StreamID(42)})));
+
     ASSERT_HAS_VALUE_AND_ASSIGN(ReConfigChunk reconfig,
                                 handler_->MakeStreamResetRequest());
     ASSERT_HAS_VALUE_AND_ASSIGN(
@@ -690,10 +741,13 @@
 
   // Reset another stream after handover
   {
-    EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
+    EXPECT_CALL(producer_, PrepareResetStream(StreamID(43)));
     handler_->ResetStreams(std::vector<StreamID>({StreamID(43)}));
 
-    EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
+    EXPECT_CALL(producer_, HasStreamsReadyToBeReset()).WillOnce(Return(true));
+    EXPECT_CALL(producer_, GetStreamsReadyToBeReset())
+        .WillOnce(Return(std::vector<StreamID>({StreamID(43)})));
+
     ASSERT_HAS_VALUE_AND_ASSIGN(ReConfigChunk reconfig,
                                 handler_->MakeStreamResetRequest());
     ASSERT_HAS_VALUE_AND_ASSIGN(
diff --git a/net/dcsctp/tx/mock_send_queue.h b/net/dcsctp/tx/mock_send_queue.h
index 0cf6458..82e96b7 100644
--- a/net/dcsctp/tx/mock_send_queue.h
+++ b/net/dcsctp/tx/mock_send_queue.h
@@ -11,6 +11,7 @@
 #define NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_
 
 #include <cstdint>
+#include <vector>
 
 #include "absl/types/optional.h"
 #include "api/array_view.h"
@@ -35,11 +36,9 @@
               Discard,
               (IsUnordered unordered, StreamID stream_id, MID message_id),
               (override));
-  MOCK_METHOD(void,
-              PrepareResetStreams,
-              (rtc::ArrayView<const StreamID> streams),
-              (override));
-  MOCK_METHOD(bool, CanResetStreams, (), (const, override));
+  MOCK_METHOD(void, PrepareResetStream, (StreamID stream_id), (override));
+  MOCK_METHOD(bool, HasStreamsReadyToBeReset, (), (const, override));
+  MOCK_METHOD(std::vector<StreamID>, GetStreamsReadyToBeReset, (), (override));
   MOCK_METHOD(void, CommitResetStreams, (), (override));
   MOCK_METHOD(void, RollbackResetStreams, (), (override));
   MOCK_METHOD(void, Reset, (), (override));
diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc
index 4afc01b..5755919 100644
--- a/net/dcsctp/tx/retransmission_queue.cc
+++ b/net/dcsctp/tx/retransmission_queue.cc
@@ -529,16 +529,15 @@
   return std::min(rwnd(), left);
 }
 
-void RetransmissionQueue::PrepareResetStreams(
-    rtc::ArrayView<const StreamID> streams) {
+void RetransmissionQueue::PrepareResetStream(StreamID stream_id) {
   // TODO(boivie): These calls are now only affecting the send queue. The
   // packet buffer can also change behavior - for example draining the chunk
   // producer and eagerly assign TSNs so that an "Outgoing SSN Reset Request"
   // can be sent quickly, with a known `sender_last_assigned_tsn`.
-  send_queue_.PrepareResetStreams(streams);
+  send_queue_.PrepareResetStream(stream_id);
 }
-bool RetransmissionQueue::CanResetStreams() const {
-  return send_queue_.CanResetStreams();
+bool RetransmissionQueue::HasStreamsReadyToBeReset() const {
+  return send_queue_.HasStreamsReadyToBeReset();
 }
 void RetransmissionQueue::CommitResetStreams() {
   send_queue_.CommitResetStreams();
diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h
index 1e866b3..1958dfd 100644
--- a/net/dcsctp/tx/retransmission_queue.h
+++ b/net/dcsctp/tx/retransmission_queue.h
@@ -143,8 +143,11 @@
 
   // See the SendQueue for a longer description of these methods related
   // to stream resetting.
-  void PrepareResetStreams(rtc::ArrayView<const StreamID> streams);
-  bool CanResetStreams() const;
+  void PrepareResetStream(StreamID stream_id);
+  bool HasStreamsReadyToBeReset() const;
+  std::vector<StreamID> GetStreamsReadyToBeReset() const {
+    return send_queue_.GetStreamsReadyToBeReset();
+  }
   void CommitResetStreams();
   void RollbackResetStreams();
 
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index b3e695b..d4ce59d 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -41,6 +41,12 @@
 }
 
 bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) {
+  if (pause_state_ == PauseState::kPaused ||
+      pause_state_ == PauseState::kResetting) {
+    // The stream has paused (and there is no partially sent message).
+    return false;
+  }
+
   while (!items_.empty()) {
     RRSendQueue::OutgoingStream::Item& item = items_.front();
     if (item.message_id.has_value()) {
@@ -59,10 +65,6 @@
       continue;
     }
 
-    if (is_paused_) {
-      // The stream has paused (and there is no partially sent message).
-      return false;
-    }
     return true;
   }
   return false;
@@ -139,6 +141,8 @@
 SendQueue::DataToSend RRSendQueue::OutgoingStream::Produce(TimeMs now,
                                                            size_t max_size) {
   RTC_DCHECK(!items_.empty());
+  RTC_DCHECK(pause_state_ != PauseState::kPaused &&
+             pause_state_ != PauseState::kResetting);
 
   Item* item = &items_.front();
   DcSctpMessage& message = item->message;
@@ -196,6 +200,12 @@
     // The entire message has been sent, and its last data copied to `chunk`, so
     // it can safely be discarded.
     items_.pop_front();
+
+    if (pause_state_ == PauseState::kPending) {
+      RTC_DLOG(LS_VERBOSE) << "Pause state on " << *stream_id
+                           << " is moving from pending to paused";
+      pause_state_ = PauseState::kPaused;
+    }
   } else {
     item->remaining_offset += chunk_payload.size();
     item->remaining_size -= chunk_payload.size();
@@ -217,6 +227,11 @@
       buffered_amount_.Decrease(item.remaining_size);
       total_buffered_amount_.Decrease(item.remaining_size);
       items_.pop_front();
+
+      if (pause_state_ == PauseState::kPending) {
+        pause_state_ = PauseState::kPaused;
+      }
+
       // As the item still existed, it had unsent data.
       result = true;
     }
@@ -226,7 +241,12 @@
 }
 
 void RRSendQueue::OutgoingStream::Pause() {
-  is_paused_ = true;
+  if (pause_state_ != PauseState::kNotPaused) {
+    // Already in progress.
+    return;
+  }
+
+  bool had_pending_items = !items_.empty();
 
   // https://datatracker.ietf.org/doc/html/rfc8831#section-6.7
   // "Closing of a data channel MUST be signaled by resetting the corresponding
@@ -250,10 +270,33 @@
       ++it;
     }
   }
+
+  pause_state_ = (items_.empty() || items_.front().remaining_offset == 0)
+                     ? PauseState::kPaused
+                     : PauseState::kPending;
+
+  if (had_pending_items && pause_state_ == PauseState::kPaused) {
+    RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
+                         << " was previously active, but is now paused.";
+  }
+
+  RTC_DCHECK(IsConsistent());
+}
+
+void RRSendQueue::OutgoingStream::Resume() {
+  RTC_DCHECK(pause_state_ == PauseState::kResetting);
+  if (!items_.empty()) {
+    RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
+                         << " was previously paused, but is now active.";
+  }
+  pause_state_ = PauseState::kNotPaused;
   RTC_DCHECK(IsConsistent());
 }
 
 void RRSendQueue::OutgoingStream::Reset() {
+  // This can be called both when an outgoing stream reset has been responded
+  // to, or when the entire SendQueue is reset due to detecting the peer having
+  // restarted. The stream may be in any state at this time.
   if (!items_.empty()) {
     // If this message has been partially sent, reset it so that it will be
     // re-sent.
@@ -268,7 +311,7 @@
     item.ssn = absl::nullopt;
     item.current_fsn = FSN(0);
   }
-  is_paused_ = false;
+  pause_state_ = PauseState::kNotPaused;
   next_ordered_mid_ = MID(0);
   next_unordered_mid_ = MID(0);
   next_ssn_ = SSN(0);
@@ -381,27 +424,39 @@
   return has_discarded;
 }
 
-void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
-  for (StreamID stream_id : streams) {
-    GetOrCreateStreamInfo(stream_id).Pause();
-  }
+void RRSendQueue::PrepareResetStream(StreamID stream_id) {
+  GetOrCreateStreamInfo(stream_id).Pause();
   RTC_DCHECK(IsConsistent());
 }
 
-bool RRSendQueue::CanResetStreams() const {
-  // Streams can be reset if those streams that are paused don't have any
-  // messages that are partially sent.
+bool RRSendQueue::HasStreamsReadyToBeReset() const {
   for (auto& [unused, stream] : streams_) {
-    if (stream.is_paused() && stream.has_partially_sent_message()) {
-      return false;
+    if (stream.IsReadyToBeReset()) {
+      return true;
     }
   }
-  return true;
+  return false;
+}
+std::vector<StreamID> RRSendQueue::GetStreamsReadyToBeReset() {
+  RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) {
+               return p.second.IsResetting();
+             }) == 0);
+  std::vector<StreamID> ready;
+  for (auto& [stream_id, stream] : streams_) {
+    if (stream.IsReadyToBeReset()) {
+      stream.SetAsResetting();
+      ready.push_back(stream_id);
+    }
+  }
+  return ready;
 }
 
 void RRSendQueue::CommitResetStreams() {
+  RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) {
+               return p.second.IsResetting();
+             }) > 0);
   for (auto& [unused, stream] : streams_) {
-    if (stream.is_paused()) {
+    if (stream.IsResetting()) {
       stream.Reset();
     }
   }
@@ -409,8 +464,13 @@
 }
 
 void RRSendQueue::RollbackResetStreams() {
+  RTC_DCHECK(absl::c_count_if(streams_, [](const auto& p) {
+               return p.second.IsResetting();
+             }) > 0);
   for (auto& [unused, stream] : streams_) {
-    stream.Resume();
+    if (stream.IsResetting()) {
+      stream.Resume();
+    }
   }
   RTC_DCHECK(IsConsistent());
 }
@@ -455,6 +515,7 @@
   return streams_
       .emplace(stream_id,
                OutgoingStream(
+                   stream_id,
                    [this, stream_id]() { on_buffered_amount_low_(stream_id); },
                    total_buffered_amount_))
       .first->second;
@@ -482,6 +543,7 @@
        state.tx.streams) {
     StreamID stream_id(state_stream.id);
     streams_.emplace(stream_id, OutgoingStream(
+                                    stream_id,
                                     [this, stream_id]() {
                                       on_buffered_amount_low_(stream_id);
                                     },
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index 6da585d..57a43cc 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -15,6 +15,7 @@
 #include <map>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "absl/algorithm/container.h"
 #include "absl/strings/string_view.h"
@@ -65,8 +66,9 @@
   bool Discard(IsUnordered unordered,
                StreamID stream_id,
                MID message_id) override;
-  void PrepareResetStreams(rtc::ArrayView<const StreamID> streams) override;
-  bool CanResetStreams() const override;
+  void PrepareResetStream(StreamID streams) override;
+  bool HasStreamsReadyToBeReset() const override;
+  std::vector<StreamID> GetStreamsReadyToBeReset() override;
   void CommitResetStreams() override;
   void RollbackResetStreams() override;
   void Reset() override;
@@ -108,16 +110,20 @@
   // Per-stream information.
   class OutgoingStream {
    public:
-    explicit OutgoingStream(
+    OutgoingStream(
+        StreamID stream_id,
         std::function<void()> on_buffered_amount_low,
         ThresholdWatcher& total_buffered_amount,
         const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
-        : next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
+        : stream_id_(stream_id),
+          next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
           next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
           next_ssn_(SSN(state ? state->next_ssn : 0)),
           buffered_amount_(std::move(on_buffered_amount_low)),
           total_buffered_amount_(total_buffered_amount) {}
 
+    StreamID stream_id() const { return stream_id_; }
+
     // Enqueues a message to this stream.
     void Add(DcSctpMessage message,
              TimeMs expires_at,
@@ -137,9 +143,18 @@
     void Pause();
 
     // Resumes a paused stream.
-    void Resume() { is_paused_ = false; }
+    void Resume();
 
-    bool is_paused() const { return is_paused_; }
+    bool IsReadyToBeReset() const {
+      return pause_state_ == PauseState::kPaused;
+    }
+
+    bool IsResetting() const { return pause_state_ == PauseState::kResetting; }
+
+    void SetAsResetting() {
+      RTC_DCHECK(pause_state_ == PauseState::kPaused);
+      pause_state_ = PauseState::kResetting;
+    }
 
     // Resets this stream, meaning MIDs and SSNs are set to zero.
     void Reset();
@@ -155,6 +170,26 @@
         DcSctpSocketHandoverState::OutgoingStream& state) const;
 
    private:
+    // Streams are paused before they can be reset. To reset a stream, the
+    // socket sends an outgoing stream reset command with the TSN of the last
+    // fragment of the last message, so that receivers and senders can agree on
+    // when it stopped. And if the send queue is in the middle of sending a
+    // message, and without fragments not yet sent and without TSNs allocated to
+    // them, it will keep sending data until that message has ended.
+    enum class PauseState {
+      // The stream is not paused, and not scheduled to be reset.
+      kNotPaused,
+      // The stream has requested to be reset/paused but is still producing
+      // fragments of a message that hasn't ended yet. When it does, it will
+      // transition to the `kPaused` state.
+      kPending,
+      // The stream is fully paused and can be reset.
+      kPaused,
+      // The stream has been added to an outgoing stream reset request and a
+      // response from the peer hasn't been received yet.
+      kResetting,
+    };
+
     // An enqueued message and metadata.
     struct Item {
       explicit Item(DcSctpMessage msg,
@@ -182,8 +217,8 @@
 
     bool IsConsistent() const;
 
-    // Streams are pause when they are about to be reset.
-    bool is_paused_ = false;
+    const StreamID stream_id_;
+    PauseState pause_state_ = PauseState::kNotPaused;
     // MIDs are different for unordered and ordered messages sent on a stream.
     MID next_unordered_mid_;
     MID next_ordered_mid_;
diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc
index a93c4a3..fbbce58 100644
--- a/net/dcsctp/tx/rr_send_queue_test.cc
+++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -26,6 +26,7 @@
 namespace dcsctp {
 namespace {
 using ::testing::SizeIs;
+using ::testing::UnorderedElementsAre;
 
 constexpr TimeMs kNow = TimeMs(0);
 constexpr StreamID kStreamID(1);
@@ -252,10 +253,13 @@
   buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5}));
   EXPECT_EQ(buf_.total_buffered_amount(), 8u);
 
-  buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
+  buf_.PrepareResetStream(StreamID(1));
   EXPECT_EQ(buf_.total_buffered_amount(), 5u);
+
+  EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
+              UnorderedElementsAre(StreamID(1)));
   buf_.CommitResetStreams();
-  buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(2)}));
+  buf_.PrepareResetStream(StreamID(2));
   EXPECT_EQ(buf_.total_buffered_amount(), 0u);
 }
 
@@ -270,21 +274,27 @@
   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
   EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50);
 
-  StreamID stream_ids[] = {StreamID(1)};
-  buf_.PrepareResetStreams(stream_ids);
+  buf_.PrepareResetStream(StreamID(1));
   EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50);
 }
 
 TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
   std::vector<uint8_t> payload(50);
 
-  buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
+  buf_.PrepareResetStream(StreamID(1));
   EXPECT_EQ(buf_.total_buffered_amount(), 0u);
 
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
   EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
 
   EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+
+  EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
+  EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
+              UnorderedElementsAre(StreamID(1)));
+
+  EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
+
   buf_.CommitResetStreams();
   EXPECT_EQ(buf_.total_buffered_amount(), payload.size());
 
@@ -309,8 +319,7 @@
   EXPECT_EQ(buf_.total_buffered_amount(), 2 * kPayloadSize - kFragmentSize);
 
   // This will stop the second message from being sent.
-  StreamID stream_ids[] = {StreamID(1)};
-  buf_.PrepareResetStreams(stream_ids);
+  buf_.PrepareResetStream(StreamID(1));
   EXPECT_EQ(buf_.total_buffered_amount(), 1 * kPayloadSize - kFragmentSize);
 
   // Should still produce fragments until end of message.
@@ -340,13 +349,14 @@
   ASSERT_TRUE(chunk_two.has_value());
   EXPECT_EQ(chunk_two->data.ssn, SSN(1));
 
-  StreamID stream_ids[] = {StreamID(1)};
-  buf_.PrepareResetStreams(stream_ids);
+  buf_.PrepareResetStream(StreamID(1));
 
   // Buffered
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
 
-  EXPECT_TRUE(buf_.CanResetStreams());
+  EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
+  EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
+              UnorderedElementsAre(StreamID(1)));
   buf_.CommitResetStreams();
 
   absl::optional<SendQueue::DataToSend> chunk_three =
@@ -373,14 +383,16 @@
   EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
   EXPECT_EQ(chunk_two->data.ssn, SSN(0));
 
-  StreamID stream_ids[] = {StreamID(3)};
-  buf_.PrepareResetStreams(stream_ids);
+  buf_.PrepareResetStream(StreamID(3));
 
   // Send two more messages - SID 3 will buffer, SID 1 will send.
   buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
   buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
 
-  EXPECT_TRUE(buf_.CanResetStreams());
+  EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
+  EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
+              UnorderedElementsAre(StreamID(3)));
+
   buf_.CommitResetStreams();
 
   absl::optional<SendQueue::DataToSend> chunk_three =
@@ -412,12 +424,14 @@
   ASSERT_TRUE(chunk_two.has_value());
   EXPECT_EQ(chunk_two->data.ssn, SSN(1));
 
-  buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
+  buf_.PrepareResetStream(StreamID(1));
 
   // Buffered
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
 
-  EXPECT_TRUE(buf_.CanResetStreams());
+  EXPECT_TRUE(buf_.HasStreamsReadyToBeReset());
+  EXPECT_THAT(buf_.GetStreamsReadyToBeReset(),
+              UnorderedElementsAre(StreamID(1)));
   buf_.RollbackResetStreams();
 
   absl::optional<SendQueue::DataToSend> chunk_three =
diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h
index a821d20..b2e5a9d 100644
--- a/net/dcsctp/tx/send_queue.h
+++ b/net/dcsctp/tx/send_queue.h
@@ -67,11 +67,11 @@
                        StreamID stream_id,
                        MID message_id) = 0;
 
-  // Prepares the streams to be reset. This is used to close a WebRTC data
+  // Prepares the stream to be reset. This is used to close a WebRTC data
   // channel and will be signaled to the other side.
   //
   // Concretely, it discards all whole (not partly sent) messages in the given
-  // streams and pauses those streams so that future added messages aren't
+  // stream and pauses that stream so that future added messages aren't
   // produced until `ResumeStreams` is called.
   //
   // TODO(boivie): Investigate if it really should discard any message at all.
@@ -82,24 +82,28 @@
   // reset, and paused while they are resetting. This is the first part of the
   // two-phase commit protocol to reset streams, where the caller completes the
   // procedure by either calling `CommitResetStreams` or `RollbackResetStreams`.
-  virtual void PrepareResetStreams(rtc::ArrayView<const StreamID> streams) = 0;
+  virtual void PrepareResetStream(StreamID stream_id) = 0;
 
-  // Returns true if all non-discarded messages during `PrepareResetStreams`
-  // (which are those that was partially sent before that method was called)
-  // have been sent.
-  virtual bool CanResetStreams() const = 0;
+  // Indicates if there are any streams that are ready to be reset.
+  virtual bool HasStreamsReadyToBeReset() const = 0;
 
-  // Called to commit to reset the streams provided to `PrepareResetStreams`.
-  // It will reset the stream sequence numbers (SSNs) and message identifiers
-  // (MIDs) and resume the paused streams.
+  // Returns a list of streams that are ready to be included in an outgoing
+  // stream reset request. Any streams that are returned here must be included
+  // in an outgoing stream reset request, and there must not be concurrent
+  // requests. Before calling this method again, you must have called
+  virtual std::vector<StreamID> GetStreamsReadyToBeReset() = 0;
+
+  // Called to commit to reset the streams returned by
+  // `GetStreamsReadyToBeReset`. It will reset the stream sequence numbers
+  // (SSNs) and message identifiers (MIDs) and resume the paused streams.
   virtual void CommitResetStreams() = 0;
 
-  // Called to abort the resetting of streams provided to `PrepareResetStreams`.
-  // Will resume the paused streams without resetting the stream sequence
-  // numbers (SSNs) or message identifiers (MIDs). Note that the non-partial
-  // messages that were discarded when calling `PrepareResetStreams` will not be
-  // recovered, to better match the intention from the sender to "close the
-  // channel".
+  // Called to abort the resetting of streams returned by
+  // `GetStreamsReadyToBeReset`. Will resume the paused streams without
+  // resetting the stream sequence numbers (SSNs) or message identifiers (MIDs).
+  // Note that the non-partial messages that were discarded when calling
+  // `PrepareResetStreams` will not be recovered, to better match the intention
+  // from the sender to "close the channel".
   virtual void RollbackResetStreams() = 0;
 
   // Resets all message identifier counters (MID, SSN) and makes all partially