dcsctp: Only process meaningful FORWARD-TSN

Similar to change I602a8552a9a4c853684fcf105309ec3d8073f2c2, which
ensured that only new DATA chunks would be processed by the reassembly
queue by utilizing the data tracker, the same is done for FORWARD-TSN
chunks.

By having the data tracker gate keeping what is provided to the
reassembly queue, the reassembly queue can be simplified as well, which
is an added bonus, by removing last_assembled_tsn_watermark_ and
reassembled_messages_ as those were protecting the queue from
re-delivering messages it had already delivered, but as now the data
tracker would ensure that it wouldn't re-process DATA/FORWARD-TSNs, that
would have the same effect. In this CL, we will still update those
variables and save to the handover state, but not actually read from
them, and then when this change has been rolled out on the servers, I
can remove the variables as well.

The core change is to move validation from ReassemblyQueue::Handle
to DataTracker::HandleForwardTsn.

Some tests have been moved/replicated into data_tracker_test.cc to
ensure that it catches the issues that the reassembly queue did earlier.

Bug: webrtc:14600
Change-Id: I75c1d5911185d594f73c8b1e6bcf776e88f5b7c7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/321603
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40856}
diff --git a/net/dcsctp/rx/data_tracker.cc b/net/dcsctp/rx/data_tracker.cc
index 1f2e43f..70b7587 100644
--- a/net/dcsctp/rx/data_tracker.cc
+++ b/net/dcsctp/rx/data_tracker.cc
@@ -214,7 +214,7 @@
   return !is_duplicate;
 }
 
-void DataTracker::HandleForwardTsn(TSN new_cumulative_ack) {
+bool DataTracker::HandleForwardTsn(TSN new_cumulative_ack) {
   // ForwardTSN is sent to make the receiver (this socket) "forget" about partly
   // received (or not received at all) data, up until `new_cumulative_ack`.
 
@@ -232,7 +232,7 @@
     // indicate the previous SACK was lost in the network."
     UpdateAckState(AckState::kImmediate,
                    "FORWARD_TSN new_cumulative_tsn was behind");
-    return;
+    return false;
   }
 
   // https://tools.ietf.org/html/rfc3758#section-3.6
@@ -271,6 +271,7 @@
     UpdateAckState(AckState::kImmediate,
                    "received FORWARD_TSN when already delayed");
   }
+  return true;
 }
 
 SackChunk DataTracker::CreateSelectiveAck(size_t a_rwnd) {
diff --git a/net/dcsctp/rx/data_tracker.h b/net/dcsctp/rx/data_tracker.h
index e07e1e3..62a1232 100644
--- a/net/dcsctp/rx/data_tracker.h
+++ b/net/dcsctp/rx/data_tracker.h
@@ -74,8 +74,9 @@
   // Called at the end of processing an SCTP packet.
   void ObservePacketEnd();
 
-  // Called for incoming FORWARD-TSN/I-FORWARD-TSN chunks
-  void HandleForwardTsn(TSN new_cumulative_ack);
+  // Called for incoming FORWARD-TSN/I-FORWARD-TSN chunks. Indicates if the
+  // chunk had any effect.
+  bool HandleForwardTsn(TSN new_cumulative_ack);
 
   // Indicates if a SACK should be sent. There may be other reasons to send a
   // SACK, but if this function indicates so, it should be sent as soon as
diff --git a/net/dcsctp/rx/data_tracker_test.cc b/net/dcsctp/rx/data_tracker_test.cc
index f74dd6e..07192fd 100644
--- a/net/dcsctp/rx/data_tracker_test.cc
+++ b/net/dcsctp/rx/data_tracker_test.cc
@@ -735,5 +735,54 @@
   EXPECT_FALSE(tracker_->ShouldSendAck());
   EXPECT_TRUE(timer_->is_running());
 }
+
+TEST_F(DataTrackerTest, DoesNotAcceptDataBeforeForwardTsn) {
+  Observer({12, 13, 14, 15, 17});
+  tracker_->ObservePacketEnd();
+
+  tracker_->HandleForwardTsn(TSN(13));
+
+  EXPECT_FALSE(tracker_->Observe(TSN(11)));
+}
+
+TEST_F(DataTrackerTest, DoesNotAcceptDataAtForwardTsn) {
+  Observer({12, 13, 14, 15, 17});
+  tracker_->ObservePacketEnd();
+
+  tracker_->HandleForwardTsn(TSN(16));
+
+  EXPECT_FALSE(tracker_->Observe(TSN(16)));
+}
+
+TEST_F(DataTrackerTest, DoesNotAcceptDataBeforeCumAckTsn) {
+  EXPECT_EQ(kInitialTSN, TSN(11));
+  EXPECT_FALSE(tracker_->Observe(TSN(10)));
+}
+
+TEST_F(DataTrackerTest, DoesNotAcceptContiguousDuplicateData) {
+  EXPECT_EQ(kInitialTSN, TSN(11));
+  EXPECT_TRUE(tracker_->Observe(TSN(11)));
+  EXPECT_FALSE(tracker_->Observe(TSN(11)));
+  EXPECT_TRUE(tracker_->Observe(TSN(12)));
+  EXPECT_FALSE(tracker_->Observe(TSN(12)));
+  EXPECT_FALSE(tracker_->Observe(TSN(11)));
+  EXPECT_FALSE(tracker_->Observe(TSN(10)));
+}
+
+TEST_F(DataTrackerTest, DoesNotAcceptGapsWithDuplicateData) {
+  EXPECT_EQ(kInitialTSN, TSN(11));
+  EXPECT_TRUE(tracker_->Observe(TSN(11)));
+  EXPECT_FALSE(tracker_->Observe(TSN(11)));
+
+  EXPECT_TRUE(tracker_->Observe(TSN(14)));
+  EXPECT_FALSE(tracker_->Observe(TSN(14)));
+
+  EXPECT_TRUE(tracker_->Observe(TSN(13)));
+  EXPECT_FALSE(tracker_->Observe(TSN(13)));
+
+  EXPECT_TRUE(tracker_->Observe(TSN(12)));
+  EXPECT_FALSE(tracker_->Observe(TSN(12)));
+}
+
 }  // namespace
 }  // namespace dcsctp
diff --git a/net/dcsctp/rx/reassembly_queue.cc b/net/dcsctp/rx/reassembly_queue.cc
index 2cc90a6..b5ab087 100644
--- a/net/dcsctp/rx/reassembly_queue.cc
+++ b/net/dcsctp/rx/reassembly_queue.cc
@@ -79,13 +79,6 @@
 
   UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);
 
-  if (unwrapped_tsn <= last_assembled_tsn_watermark_ ||
-      delivered_tsns_.find(unwrapped_tsn) != delivered_tsns_.end()) {
-    RTC_DLOG(LS_VERBOSE) << log_prefix_
-                         << "Chunk has already been delivered - skipping";
-    return;
-  }
-
   // If a stream reset has been received with a "sender's last assigned tsn" in
   // the future, the socket is in "deferred reset processing" mode and must
   // buffer chunks until it's exited.
@@ -218,15 +211,7 @@
                        << ", payload=" << message.payload().size() << " bytes";
 
   for (const UnwrappedTSN tsn : tsns) {
-    if (tsn <= last_assembled_tsn_watermark_) {
-      // This can be provoked by a misbehaving peer by sending FORWARD-TSN with
-      // invalid SSNs, allowing ordered messages to stay in the queue that
-      // should've been discarded.
-      RTC_DLOG(LS_VERBOSE)
-          << log_prefix_
-          << "Message is built from fragments already seen - skipping";
-      return;
-    } else if (tsn == last_assembled_tsn_watermark_.next_value()) {
+    if (tsn == last_assembled_tsn_watermark_.next_value()) {
       // Update watermark, or insert into delivered_tsns_
       last_assembled_tsn_watermark_.Increment();
     } else {
diff --git a/net/dcsctp/rx/reassembly_queue_test.cc b/net/dcsctp/rx/reassembly_queue_test.cc
index 549bc6f..d7b9c1d 100644
--- a/net/dcsctp/rx/reassembly_queue_test.cc
+++ b/net/dcsctp/rx/reassembly_queue_test.cc
@@ -197,11 +197,6 @@
   reasm.Handle(ForwardTsnChunk(TSN(13), {}));
   EXPECT_EQ(reasm.queued_bytes(), 3u);
 
-  // The lost chunk comes, but too late.
-  reasm.Add(TSN(11), gen_.Unordered({2}));
-  EXPECT_FALSE(reasm.HasMessages());
-  EXPECT_EQ(reasm.queued_bytes(), 3u);
-
   // The second lost chunk comes, message is assembled.
   reasm.Add(TSN(16), gen_.Unordered({7}));
   EXPECT_TRUE(reasm.HasMessages());
@@ -256,52 +251,6 @@
               ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload)));
 }
 
-TEST_F(ReassemblyQueueTest, ShouldntDeliverMessagesBeforeInitialTsn) {
-  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
-  reasm.Add(TSN(5), gen_.Unordered({1, 2, 3, 4}, "BE"));
-  EXPECT_EQ(reasm.queued_bytes(), 0u);
-  EXPECT_FALSE(reasm.HasMessages());
-}
-
-TEST_F(ReassemblyQueueTest, ShouldntRedeliverUnorderedMessages) {
-  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
-  reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE"));
-  EXPECT_EQ(reasm.queued_bytes(), 0u);
-  EXPECT_TRUE(reasm.HasMessages());
-  EXPECT_THAT(reasm.FlushMessages(),
-              ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
-  reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE"));
-  EXPECT_EQ(reasm.queued_bytes(), 0u);
-  EXPECT_FALSE(reasm.HasMessages());
-}
-
-TEST_F(ReassemblyQueueTest, ShouldntRedeliverUnorderedMessagesReallyUnordered) {
-  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
-  reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "B"));
-  EXPECT_EQ(reasm.queued_bytes(), 4u);
-
-  EXPECT_FALSE(reasm.HasMessages());
-
-  reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
-  EXPECT_EQ(reasm.queued_bytes(), 4u);
-  EXPECT_TRUE(reasm.HasMessages());
-
-  EXPECT_THAT(reasm.FlushMessages(),
-              ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
-  reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
-  EXPECT_EQ(reasm.queued_bytes(), 4u);
-  EXPECT_FALSE(reasm.HasMessages());
-}
-
-TEST_F(ReassemblyQueueTest, ShouldntDeliverBeforeForwardedTsn) {
-  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
-  reasm.Handle(ForwardTsnChunk(TSN(12), {}));
-
-  reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
-  EXPECT_EQ(reasm.queued_bytes(), 0u);
-  EXPECT_FALSE(reasm.HasMessages());
-}
-
 TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenDeliveredTsnsHaveGap) {
   ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
   reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "B"));
@@ -400,22 +349,6 @@
   EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1));
 }
 
-TEST_F(ReassemblyQueueTest, HandleInconsistentForwardTSN) {
-  // Found when fuzzing.
-  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
-  // Add TSN=43, SSN=7. Can't be reassembled as previous SSNs aren't known.
-  reasm.Add(TSN(43), Data(kStreamID, SSN(7), MID(0), FSN(0), kPPID,
-                          std::vector<uint8_t>(10), Data::IsBeginning(true),
-                          Data::IsEnd(true), IsUnordered(false)));
-
-  // Invalid, as TSN=44 have to have SSN>=7, but peer says 6.
-  reasm.Handle(ForwardTsnChunk(
-      TSN(44), {ForwardTsnChunk::SkippedStream(kStreamID, SSN(6))}));
-
-  // Don't assemble SSN=7, as that TSN is skipped.
-  EXPECT_FALSE(reasm.HasMessages());
-}
-
 TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessageInRfc8260) {
   ReassemblyQueue reasm("log: ", TSN(10), kBufferSize,
                         /*use_message_interleaving=*/true);
diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc
index 6101007..2e29a5a 100644
--- a/net/dcsctp/socket/dcsctp_socket.cc
+++ b/net/dcsctp/socket/dcsctp_socket.cc
@@ -1709,8 +1709,9 @@
                        "Received a FORWARD_TSN without announced peer support");
     return;
   }
-  tcb_->data_tracker().HandleForwardTsn(chunk.new_cumulative_tsn());
-  tcb_->reassembly_queue().Handle(chunk);
+  if (tcb_->data_tracker().HandleForwardTsn(chunk.new_cumulative_tsn())) {
+    tcb_->reassembly_queue().Handle(chunk);
+  }
 
   // A forward TSN - for ordered streams - may allow messages to be
   // delivered.