dcsctp: Only reset paused streams when peer acks

When a single stream is reset, and an outgoing SSN reset request is sent
and later acked by the peer sending a reconfiguration response with
status=Performed, the sender should unpause the paused stream and reset
the SSNs of that (ordered) stream. But only the single stream that was
paused, and not all streams. In this scenario, dcSCTP would - when the
peer acked the SSN reset request - reset the SSN of all streams.

This was found by orphis@webrtc.org using a data channel test
application. The peer, if it's a usrsctp client, will ABORT with
PROTOCOL_VIOLATION as it has already seen that SSN on that stream but
with a different TSN.

This bug was introduced when implementing the Round Robin scheduler in
https://webrtc-review.googlesource.com/c/src/+/219682. The FCFS
scheduler prior to this change was implemented correctly.

Bug: webrtc:12952
Change-Id: I3ea144a1df303145f69a5b03aada7f448c8c8163
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/225266
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34436}
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc
index e5db12c..7a7daac 100644
--- a/net/dcsctp/socket/dcsctp_socket_test.cc
+++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -60,6 +60,33 @@
 constexpr size_t kLargeMessageSize = DcSctpOptions::kMaxSafeMTUSize * 20;
 static constexpr size_t kSmallMessageSize = 10;
 
+MATCHER_P(HasDataChunkWithStreamId, stream_id, "") {
+  absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
+  if (!packet.has_value()) {
+    *result_listener << "data didn't parse as an SctpPacket";
+    return false;
+  }
+
+  if (packet->descriptors()[0].type != DataChunk::kType) {
+    *result_listener << "the first chunk in the packet is not a data chunk";
+    return false;
+  }
+
+  absl::optional<DataChunk> dc =
+      DataChunk::Parse(packet->descriptors()[0].data);
+  if (!dc.has_value()) {
+    *result_listener << "The first chunk didn't parse as a data chunk";
+    return false;
+  }
+
+  if (dc->stream_id() != stream_id) {
+    *result_listener << "the stream_id is " << *dc->stream_id();
+    return false;
+  }
+
+  return true;
+}
+
 MATCHER_P(HasDataChunkWithSsn, ssn, "") {
   absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
   if (!packet.has_value()) {
@@ -888,6 +915,84 @@
   sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
 }
 
+TEST_F(DcSctpSocketTest, ResetStreamWillOnlyResetTheRequestedStreams) {
+  ConnectSockets();
+
+  std::vector<uint8_t> payload(options_.mtu - 100);
+
+  // Send two ordered messages on SID 1
+  sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {});
+  sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {});
+
+  auto packet1 = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet1, HasDataChunkWithStreamId(StreamID(1)));
+  EXPECT_THAT(packet1, HasDataChunkWithSsn(SSN(0)));
+  sock_z_.ReceivePacket(packet1);
+
+  auto packet2 = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet1, HasDataChunkWithStreamId(StreamID(1)));
+  EXPECT_THAT(packet2, HasDataChunkWithSsn(SSN(1)));
+  sock_z_.ReceivePacket(packet2);
+
+  // Handle SACK
+  sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
+
+  // Do the same, for SID 3
+  sock_a_.Send(DcSctpMessage(StreamID(3), PPID(53), payload), {});
+  sock_a_.Send(DcSctpMessage(StreamID(3), PPID(53), payload), {});
+  auto packet3 = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet3, HasDataChunkWithStreamId(StreamID(3)));
+  EXPECT_THAT(packet3, HasDataChunkWithSsn(SSN(0)));
+  sock_z_.ReceivePacket(packet3);
+  auto packet4 = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet4, HasDataChunkWithStreamId(StreamID(3)));
+  EXPECT_THAT(packet4, HasDataChunkWithSsn(SSN(1)));
+  sock_z_.ReceivePacket(packet4);
+  sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
+
+  // Receive all messages.
+  absl::optional<DcSctpMessage> msg1 = cb_z_.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg1.has_value());
+  EXPECT_EQ(msg1->stream_id(), StreamID(1));
+
+  absl::optional<DcSctpMessage> msg2 = cb_z_.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg2.has_value());
+  EXPECT_EQ(msg2->stream_id(), StreamID(1));
+
+  absl::optional<DcSctpMessage> msg3 = cb_z_.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg3.has_value());
+  EXPECT_EQ(msg3->stream_id(), StreamID(3));
+
+  absl::optional<DcSctpMessage> msg4 = cb_z_.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg4.has_value());
+  EXPECT_EQ(msg4->stream_id(), StreamID(3));
+
+  // Reset SID 1. This will directly send a RE-CONFIG.
+  sock_a_.ResetStreams(std::vector<StreamID>({StreamID(3)}));
+  // RE-CONFIG, req
+  sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());
+  // RE-CONFIG, resp
+  sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
+
+  // Send a message on SID 1 and 3 - SID 1 should not be reset, but 3 should.
+  sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), {});
+
+  sock_a_.Send(DcSctpMessage(StreamID(3), PPID(53), payload), {});
+
+  auto packet5 = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet5, HasDataChunkWithStreamId(StreamID(1)));
+  EXPECT_THAT(packet5, HasDataChunkWithSsn(SSN(2)));  // Unchanged.
+  sock_z_.ReceivePacket(packet5);
+
+  auto packet6 = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet6, HasDataChunkWithStreamId(StreamID(3)));
+  EXPECT_THAT(packet6, HasDataChunkWithSsn(SSN(0)));  // Reset.
+  sock_z_.ReceivePacket(packet6);
+
+  // Handle SACK
+  sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
+}
+
 TEST_F(DcSctpSocketTest, OnePeerReconnects) {
   ConnectSockets();
 
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 4bfbaf7..254214e 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -373,7 +373,11 @@
 }
 
 void RRSendQueue::CommitResetStreams() {
-  Reset();
+  for (auto& stream_entry : streams_) {
+    if (stream_entry.second.is_paused()) {
+      stream_entry.second.Reset();
+    }
+  }
   RTC_DCHECK(IsConsistent());
 }
 
diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc
index 682c16a..4250277 100644
--- a/net/dcsctp/tx/rr_send_queue_test.cc
+++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -354,6 +354,47 @@
   EXPECT_EQ(chunk_three->data.ssn, SSN(0));
 }
 
+TEST_F(RRSendQueueTest, CommittingResetsSSNForPausedStreamsOnly) {
+  std::vector<uint8_t> payload(50);
+
+  buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+  buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
+
+  absl::optional<SendQueue::DataToSend> chunk_one =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
+  ASSERT_TRUE(chunk_one.has_value());
+  EXPECT_EQ(chunk_one->data.stream_id, StreamID(1));
+  EXPECT_EQ(chunk_one->data.ssn, SSN(0));
+
+  absl::optional<SendQueue::DataToSend> chunk_two =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
+  ASSERT_TRUE(chunk_two.has_value());
+  EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
+  EXPECT_EQ(chunk_two->data.ssn, SSN(0));
+
+  StreamID stream_ids[] = {StreamID(3)};
+  buf_.PrepareResetStreams(stream_ids);
+
+  // Send two more messages - SID 3 will buffer, SID 1 will send.
+  buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
+  buf_.Add(kNow, DcSctpMessage(StreamID(3), kPPID, payload));
+
+  EXPECT_TRUE(buf_.CanResetStreams());
+  buf_.CommitResetStreams();
+
+  absl::optional<SendQueue::DataToSend> chunk_three =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
+  ASSERT_TRUE(chunk_three.has_value());
+  EXPECT_EQ(chunk_three->data.stream_id, StreamID(1));
+  EXPECT_EQ(chunk_three->data.ssn, SSN(1));
+
+  absl::optional<SendQueue::DataToSend> chunk_four =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
+  ASSERT_TRUE(chunk_four.has_value());
+  EXPECT_EQ(chunk_four->data.stream_id, StreamID(3));
+  EXPECT_EQ(chunk_four->data.ssn, SSN(0));
+}
+
 TEST_F(RRSendQueueTest, RollBackResumesSSN) {
   std::vector<uint8_t> payload(50);