dcsctp: Abandon chunks consistently

The previous logic to abandon chunks when partial reliability was used
was a bit too eager and trigger happy.

 * Chunks with limited retransmissions should only be abandoned when a
   chunk is really considered lost. It should follow the same rules as
   for retransmitting chunks - that it must be nacked three times or
   due to a T3-RTX expiration. Before this change, a single SACK not
   referencing it would be enough to abandon it. This resulted in a lot
   of unnecessary sent FORWARD-TSN and undelivered messages - especially
   if running with zero retransmissions.

   The logic to expire chunks by limited retransmissions will now only
   be applied when a chunk is actually nacked.

 * The second partial reliability trigger - expiration time - wasn't
   evaluated when producing a middle chunk of a larger message.

A number of test cases were added and updated as chunks will now be
abandoned immediately instead of first scheduled for retransmission and
later abandoned.

Bug: webrtc:12961
Change-Id: I0ae17b2672568bdbdc32073a99d4c24b09ff5fe9
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/225548
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34458}
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc
index 4ab4249..7ca3d9b 100644
--- a/net/dcsctp/socket/dcsctp_socket_test.cc
+++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -91,6 +91,33 @@
   return true;
 }
 
+MATCHER_P(HasDataChunkWithPPID, ppid, "") {
+  absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
+  if (!packet.has_value()) {
+    *result_listener << "data didn't parse as an SctpPacket";
+    return false;
+  }
+
+  if (packet->descriptors()[0].type != DataChunk::kType) {
+    *result_listener << "the first chunk in the packet is not a data chunk";
+    return false;
+  }
+
+  absl::optional<DataChunk> dc =
+      DataChunk::Parse(packet->descriptors()[0].data);
+  if (!dc.has_value()) {
+    *result_listener << "The first chunk didn't parse as a data chunk";
+    return false;
+  }
+
+  if (dc->ppid() != ppid) {
+    *result_listener << "the ppid is " << *dc->ppid();
+    return false;
+  }
+
+  return true;
+}
+
 MATCHER_P(HasDataChunkWithSsn, ssn, "") {
   absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
   if (!packet.has_value()) {
@@ -1049,7 +1076,14 @@
   // Third DATA
   sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());
 
-  // Handle SACK
+  // Handle SACK for first DATA
+  sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
+
+  // Handle delayed SACK for third DATA
+  AdvanceTime(options_.delayed_ack_max_timeout);
+  RunTimers();
+
+  // Handle SACK for second DATA
   sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
 
   // Now the missing data chunk will be marked as nacked, but it might still be
@@ -1065,11 +1099,7 @@
   // FORWARD-TSN (third)
   sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());
 
-  // The receiver might have moved into delayed ack mode.
-  AdvanceTime(options_.rto_initial);
-  RunTimers();
-
-  // Handle SACK
+  // Which will trigger a SACK
   sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
 
   absl::optional<DcSctpMessage> msg1 = cb_z_.ConsumeReceivedMessage();
@@ -1084,6 +1114,78 @@
   EXPECT_FALSE(msg3.has_value());
 }
 
+TEST_F(DcSctpSocketTest, SendManyFragmentedMessagesWithLimitedRtx) {
+  ConnectSockets();
+
+  SendOptions send_options;
+  send_options.unordered = IsUnordered(true);
+  send_options.max_retransmissions = 0;
+  std::vector<uint8_t> payload(options_.mtu * 2 - 100 /* margin */);
+  // Sending first message
+  sock_a_.Send(DcSctpMessage(StreamID(1), PPID(51), payload), send_options);
+  // Sending second message
+  sock_a_.Send(DcSctpMessage(StreamID(1), PPID(52), payload), send_options);
+  // Sending third message
+  sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), send_options);
+  // Sending fourth message
+  sock_a_.Send(DcSctpMessage(StreamID(1), PPID(54), payload), send_options);
+
+  // First DATA, first fragment
+  std::vector<uint8_t> packet = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(51)));
+  sock_z_.ReceivePacket(std::move(packet));
+
+  // First DATA, second fragment (lost)
+  packet = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(51)));
+
+  // Second DATA, first fragment
+  packet = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(52)));
+  sock_z_.ReceivePacket(std::move(packet));
+
+  // Second DATA, second fragment (lost)
+  packet = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(52)));
+  EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
+
+  // Third DATA, first fragment
+  packet = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(53)));
+  EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
+  sock_z_.ReceivePacket(std::move(packet));
+
+  // Third DATA, second fragment (lost)
+  packet = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(53)));
+  EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
+
+  // Fourth DATA, first fragment
+  packet = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(54)));
+  EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
+  sock_z_.ReceivePacket(std::move(packet));
+
+  // Fourth DATA, second fragment
+  packet = cb_a_.ConsumeSentPacket();
+  EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(54)));
+  EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
+  sock_z_.ReceivePacket(std::move(packet));
+
+  ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
+
+  // Let the RTX timer expire, and exchange FORWARD-TSN/SACKs
+  AdvanceTime(options_.rto_initial);
+  RunTimers();
+  ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
+
+  absl::optional<DcSctpMessage> msg1 = cb_z_.ConsumeReceivedMessage();
+  ASSERT_TRUE(msg1.has_value());
+  EXPECT_EQ(msg1->ppid(), PPID(54));
+
+  ASSERT_FALSE(cb_z_.ConsumeReceivedMessage().has_value());
+}
+
 struct FakeChunkConfig : ChunkConfig {
   static constexpr int kType = 0x49;
   static constexpr size_t kHeaderSize = 4;
diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc
index ef2f0e3..51bb65a 100644
--- a/net/dcsctp/tx/retransmission_queue.cc
+++ b/net/dcsctp/tx/retransmission_queue.cc
@@ -188,16 +188,8 @@
     for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked);
          iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) {
       if (iter->first <= max_tsn_to_nack) {
-        if (iter->second.is_outstanding()) {
-          outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data());
-        }
-
-        if (iter->second.Nack()) {
-          ack_info.has_packet_loss = true;
-          to_be_retransmitted_.insert(iter->first);
-          RTC_DLOG(LS_VERBOSE) << log_prefix_ << *iter->first.Wrap()
-                               << " marked for retransmission";
-        }
+        ack_info.has_packet_loss =
+            NackItem(iter->first, iter->second, /*retransmit_now=*/false);
       }
     }
     prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
@@ -499,20 +491,11 @@
   // T3-rtx timer expired but did not fit in one MTU (rule E3 above) should be
   // marked for retransmission and sent as soon as cwnd allows (normally, when a
   // SACK arrives)."
-  int count = 0;
   for (auto& elem : outstanding_data_) {
     UnwrappedTSN tsn = elem.first;
     TxData& item = elem.second;
     if (!item.is_acked()) {
-      if (item.is_outstanding()) {
-        outstanding_bytes_ -= GetSerializedChunkSize(item.data());
-      }
-      if (item.Nack(/*retransmit_now=*/true)) {
-        to_be_retransmitted_.insert(tsn);
-        RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Chunk " << *tsn.Wrap()
-                             << " will be retransmitted due to T3-RTX";
-        ++count;
-      }
+      NackItem(tsn, item, /*retransmit_now=*/true);
     }
   }
 
@@ -524,12 +507,33 @@
 
   RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_
                     << " (" << old_cwnd << "), ssthresh=" << ssthresh_
-                    << ", rtx-packets=" << count << ", outstanding_bytes "
-                    << outstanding_bytes_ << " (" << old_outstanding_bytes
-                    << ")";
+                    << ", outstanding_bytes " << outstanding_bytes_ << " ("
+                    << old_outstanding_bytes << ")";
   RTC_DCHECK(IsConsistent());
 }
 
+bool RetransmissionQueue::NackItem(UnwrappedTSN tsn,
+                                   TxData& item,
+                                   bool retransmit_now) {
+  if (item.is_outstanding()) {
+    outstanding_bytes_ -= GetSerializedChunkSize(item.data());
+  }
+
+  switch (item.Nack(retransmit_now)) {
+    case TxData::NackAction::kNothing:
+      return false;
+    case TxData::NackAction::kRetransmit:
+      to_be_retransmitted_.insert(tsn);
+      RTC_DLOG(LS_VERBOSE) << log_prefix_ << *tsn.Wrap()
+                           << " marked for retransmission";
+      break;
+    case TxData::NackAction::kAbandon:
+      AbandonAllFor(item);
+      break;
+  }
+  return true;
+}
+
 std::vector<std::pair<TSN, Data>>
 RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) {
   std::vector<std::pair<TSN, Data>> result;
@@ -615,17 +619,35 @@
 
       UnwrappedTSN tsn = next_tsn_;
       next_tsn_.Increment();
-      to_be_sent.emplace_back(tsn.Wrap(), chunk_opt->data.Clone());
 
       // All chunks are always padded to be even divisible by 4.
       size_t chunk_size = GetSerializedChunkSize(chunk_opt->data);
       max_bytes -= chunk_size;
       outstanding_bytes_ += chunk_size;
       rwnd_ -= chunk_size;
-      outstanding_data_.emplace(
-          tsn, RetransmissionQueue::TxData(std::move(chunk_opt->data),
-                                           chunk_opt->max_retransmissions, now,
-                                           chunk_opt->expires_at));
+      auto item_it =
+          outstanding_data_
+              .emplace(tsn,
+                       RetransmissionQueue::TxData(
+                           chunk_opt->data.Clone(),
+                           partial_reliability_ ? chunk_opt->max_retransmissions
+                                                : absl::nullopt,
+                           now,
+                           partial_reliability_ ? chunk_opt->expires_at
+                                                : absl::nullopt))
+              .first;
+
+      if (item_it->second.has_expired(now)) {
+        // No need to send it - it was expired when it was in the send
+        // queue.
+        RTC_DLOG(LS_VERBOSE)
+            << log_prefix_ << "Marking freshly produced chunk "
+            << *item_it->first.Wrap() << " and message "
+            << *item_it->second.data().message_id << " as expired";
+        AbandonAllFor(item_it->second);
+      } else {
+        to_be_sent.emplace_back(tsn.Wrap(), std::move(chunk_opt->data));
+      }
     }
   }
 
@@ -684,7 +706,7 @@
   if (!partial_reliability_) {
     return false;
   }
-  ExpireChunks(now);
+  ExpireOutstandingChunks(now);
   if (!outstanding_data_.empty()) {
     auto it = outstanding_data_.begin();
     return it->first == last_cumulative_tsn_ack_.next_value() &&
@@ -699,15 +721,22 @@
   should_be_retransmitted_ = false;
 }
 
-bool RetransmissionQueue::TxData::Nack(bool retransmit_now) {
+RetransmissionQueue::TxData::NackAction RetransmissionQueue::TxData::Nack(
+    bool retransmit_now) {
   ack_state_ = AckState::kNacked;
   ++nack_count_;
   if ((retransmit_now || nack_count_ >= kNumberOfNacksForRetransmission) &&
       !is_abandoned_) {
-    should_be_retransmitted_ = true;
-    return true;
+    // Nacked enough times - it's considered lost.
+    if (!max_retransmissions_.has_value() ||
+        num_retransmissions_ < max_retransmissions_) {
+      should_be_retransmitted_ = true;
+      return NackAction::kRetransmit;
+    }
+    Abandon();
+    return NackAction::kAbandon;
   }
-  return false;
+  return NackAction::kNothing;
 }
 
 void RetransmissionQueue::TxData::Retransmit() {
@@ -724,33 +753,24 @@
 }
 
 bool RetransmissionQueue::TxData::has_expired(TimeMs now) const {
-  if (ack_state_ != AckState::kAcked && !is_abandoned_) {
-    if (max_retransmissions_.has_value() &&
-        num_retransmissions_ >= *max_retransmissions_) {
-      return true;
-    } else if (expires_at_.has_value() && *expires_at_ <= now) {
-      return true;
-    }
-  }
-  return false;
+  return expires_at_.has_value() && *expires_at_ <= now;
 }
 
-void RetransmissionQueue::ExpireChunks(TimeMs now) {
+void RetransmissionQueue::ExpireOutstandingChunks(TimeMs now) {
   for (const auto& elem : outstanding_data_) {
     UnwrappedTSN tsn = elem.first;
     const TxData& item = elem.second;
 
-    // Chunks that are in-flight (possibly lost?), nacked or to be retransmitted
-    // can be expired easily. There is always a risk that a message is expired
-    // that was already received by the peer, but for which there haven't been
-    // a SACK received. But that's acceptable, and handled.
+    // Chunks that are nacked can be expired. Care should be taken not to expire
+    // unacked (in-flight) chunks as they might have been received, but the SACK
+    // is either delayed or in-flight and may be received later.
     if (item.is_abandoned()) {
       // Already abandoned.
-    } else if (item.has_expired(now)) {
-      RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap()
-                           << " and message " << *item.data().message_id
-                           << " as expired";
-      ExpireAllFor(item);
+    } else if (item.is_nacked() && item.has_expired(now)) {
+      RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking nacked chunk "
+                           << *tsn.Wrap() << " and message "
+                           << *item.data().message_id << " as expired";
+      AbandonAllFor(item);
     } else {
       // A non-expired chunk. No need to iterate any further.
       break;
@@ -758,7 +778,7 @@
   }
 }
 
-void RetransmissionQueue::ExpireAllFor(
+void RetransmissionQueue::AbandonAllFor(
     const RetransmissionQueue::TxData& item) {
   // Erase all remaining chunks from the producer, if any.
   if (send_queue_.Discard(item.data().is_unordered, item.data().stream_id,
diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h
index 7f5baf9..c5a6a04 100644
--- a/net/dcsctp/tx/retransmission_queue.h
+++ b/net/dcsctp/tx/retransmission_queue.h
@@ -143,6 +143,12 @@
   // its associated metadata.
   class TxData {
    public:
+    enum class NackAction {
+      kNothing,
+      kRetransmit,
+      kAbandon,
+    };
+
     explicit TxData(Data data,
                     absl::optional<size_t> max_retransmissions,
                     TimeMs time_sent,
@@ -160,9 +166,10 @@
     void Ack();
 
     // Nacks an item. If it has been nacked enough times, or if `retransmit_now`
-    // is set, it might be marked for retransmission, which is indicated by the
-    // return value.
-    bool Nack(bool retransmit_now = false);
+    // is set, it might be marked for retransmission. If the item has reached
+    // its max retransmission value, it will instead be abandoned. The action
+    // performed is indicated as return value.
+    NackAction Nack(bool retransmit_now = false);
 
     // Prepares the item to be retransmitted. Sets it as outstanding and
     // clears all nack counters.
@@ -173,6 +180,7 @@
 
     bool is_outstanding() const { return ack_state_ == AckState::kUnacked; }
     bool is_acked() const { return ack_state_ == AckState::kAcked; }
+    bool is_nacked() const { return ack_state_ == AckState::kNacked; }
     bool is_abandoned() const { return is_abandoned_; }
 
     // Indicates if this chunk should be retransmitted.
@@ -264,6 +272,14 @@
   // by setting `bytes_acked_by_cumulative_tsn_ack` and `acked_tsns`.
   void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info);
 
+  // Helper method to nack an item and perform the correct operations given the
+  // action indicated when nacking an item (e.g. retransmitting or abandoning).
+  // The return value indicate if an action was performed, meaning that packet
+  // loss was detected and acted upon.
+  bool NackItem(UnwrappedTSN cumulative_tsn_ack,
+                TxData& item,
+                bool retransmit_now);
+
   // Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK
   // as "acked" and update `ack_info` by adding new TSNs to `added_tsns`.
   void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack,
@@ -307,13 +323,13 @@
   // is running.
   void StartT3RtxTimerIfOutstandingData();
 
-  // Given the current time `now_ms`, expire chunks that have a limited
-  // lifetime.
-  void ExpireChunks(TimeMs now);
-  // Given that a message fragment, `item` has expired, expire all other
-  // fragments that share the same message - even never-before-sent fragments
-  // that are still in the SendQueue.
-  void ExpireAllFor(const RetransmissionQueue::TxData& item);
+  // Given the current time `now_ms`, expire and abandon outstanding (sent at
+  // least once) chunks that have a limited lifetime.
+  void ExpireOutstandingChunks(TimeMs now);
+  // Given that a message fragment, `item` has been abandoned, abandon all other
+  // fragments that share the same message - both never-before-sent fragments
+  // that are still in the SendQueue and outstanding chunks.
+  void AbandonAllFor(const RetransmissionQueue::TxData& item);
 
   // Returns the current congestion control algorithm phase.
   CongestionAlgorithmPhase phase() const {
diff --git a/net/dcsctp/tx/retransmission_queue_test.cc b/net/dcsctp/tx/retransmission_queue_test.cc
index e02b111..4aa76d6 100644
--- a/net/dcsctp/tx/retransmission_queue_test.cc
+++ b/net/dcsctp/tx/retransmission_queue_test.cc
@@ -378,14 +378,14 @@
                           Pair(TSN(10), State::kInFlight)));
 
   // Will force chunks to be retransmitted
+  EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+      .Times(1);
+
   queue.HandleT3RtxTimerExpiry();
 
   EXPECT_THAT(queue.GetChunkStatesForTesting(),
               ElementsAre(Pair(TSN(9), State::kAcked),  //
-                          Pair(TSN(10), State::kToBeRetransmitted)));
-
-  EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
-      .Times(1);
+                          Pair(TSN(10), State::kAbandoned)));
 
   EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
 
@@ -438,9 +438,9 @@
   EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
 
   // Retransmission 4 - not allowed.
-  queue.HandleT3RtxTimerExpiry();
   EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
       .Times(1);
+  queue.HandleT3RtxTimerExpiry();
   EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
   EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());
 
@@ -521,16 +521,11 @@
 
   // Chunk 10 is acked, but the remaining are lost
   queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
-  queue.HandleT3RtxTimerExpiry();
-
-  EXPECT_THAT(queue.GetChunkStatesForTesting(),
-              ElementsAre(Pair(TSN(10), State::kAcked),              //
-                          Pair(TSN(11), State::kToBeRetransmitted),  //
-                          Pair(TSN(12), State::kToBeRetransmitted)));
 
   EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
       .WillOnce(Return(true));
-  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+  queue.HandleT3RtxTimerExpiry();
 
   // NOTE: The TSN=13 represents the end fragment.
   EXPECT_THAT(queue.GetChunkStatesForTesting(),
@@ -539,6 +534,8 @@
                           Pair(TSN(12), State::kAbandoned),  //
                           Pair(TSN(13), State::kAbandoned)));
 
+  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
   ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
   EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13));
   EXPECT_THAT(forward_tsn.skipped_streams(),
@@ -579,23 +576,19 @@
 
   // Chunk 10 is acked, but the remaining are lost
   queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
-  queue.HandleT3RtxTimerExpiry();
-
-  EXPECT_THAT(queue.GetChunkStatesForTesting(),
-              ElementsAre(Pair(TSN(10), State::kAcked),              //
-                          Pair(TSN(11), State::kToBeRetransmitted),  //
-                          Pair(TSN(12), State::kToBeRetransmitted)));
 
   EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
       .WillOnce(Return(false));
-  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
 
-  // NOTE: No additional TSN representing the end fragment, as that's TSN=12.
+  queue.HandleT3RtxTimerExpiry();
+
   EXPECT_THAT(queue.GetChunkStatesForTesting(),
               ElementsAre(Pair(TSN(10), State::kAcked),      //
                           Pair(TSN(11), State::kAbandoned),  //
                           Pair(TSN(12), State::kAbandoned)));
 
+  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
   ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
   EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12));
   EXPECT_THAT(forward_tsn.skipped_streams(),
@@ -657,22 +650,14 @@
                           Pair(TSN(12), State::kNacked),  //
                           Pair(TSN(13), State::kAcked)));
 
-  queue.HandleT3RtxTimerExpiry();
-
-  EXPECT_THAT(queue.GetChunkStatesForTesting(),
-              ElementsAre(Pair(TSN(9), State::kAcked),               //
-                          Pair(TSN(10), State::kToBeRetransmitted),  //
-                          Pair(TSN(11), State::kToBeRetransmitted),  //
-                          Pair(TSN(12), State::kToBeRetransmitted),  //
-                          Pair(TSN(13), State::kAcked)));
-
   EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
       .WillOnce(Return(true));
   EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42)))
       .WillOnce(Return(true));
   EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42)))
       .WillOnce(Return(true));
-  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+  queue.HandleT3RtxTimerExpiry();
 
   EXPECT_THAT(queue.GetChunkStatesForTesting(),
               ElementsAre(Pair(TSN(9), State::kAcked),       //
@@ -685,6 +670,8 @@
                           Pair(TSN(15), State::kAbandoned),  //
                           Pair(TSN(16), State::kAbandoned)));
 
+  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
   IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn();
   EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12));
   EXPECT_THAT(
@@ -891,61 +878,6 @@
   EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));
 }
 
-TEST_F(RetransmissionQueueTest, AccountsInflightAbandonedChunksAsOutstanding) {
-  RetransmissionQueue queue = CreateQueue();
-  EXPECT_CALL(producer_, Produce)
-      .WillOnce([this](TimeMs, size_t) {
-        SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
-        dts.max_retransmissions = 0;
-        return dts;
-      })
-      .WillOnce([this](TimeMs, size_t) {
-        SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
-        dts.max_retransmissions = 0;
-        return dts;
-      })
-      .WillOnce([this](TimeMs, size_t) {
-        SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
-        dts.max_retransmissions = 0;
-        return dts;
-      })
-      .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
-
-  // Send and ack first chunk (TSN 10)
-  std::vector<std::pair<TSN, Data>> chunks_to_send =
-      queue.GetChunksToSend(now_, 1000);
-  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
-                                          Pair(TSN(12), _)));
-  EXPECT_THAT(queue.GetChunkStatesForTesting(),
-              ElementsAre(Pair(TSN(9), State::kAcked),      //
-                          Pair(TSN(10), State::kInFlight),  //
-                          Pair(TSN(11), State::kInFlight),  //
-                          Pair(TSN(12), State::kInFlight)));
-  EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
-
-  // Discard the message while it was outstanding.
-  EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
-      .Times(1);
-  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
-
-  EXPECT_THAT(queue.GetChunkStatesForTesting(),
-              ElementsAre(Pair(TSN(9), State::kAcked),       //
-                          Pair(TSN(10), State::kAbandoned),  //
-                          Pair(TSN(11), State::kAbandoned),  //
-                          Pair(TSN(12), State::kAbandoned)));
-  EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
-
-  // Now ACK those, one at a time.
-  queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
-  EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 2u);
-
-  queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
-  EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 1u);
-
-  queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
-  EXPECT_EQ(queue.outstanding_bytes(), 0u);
-}
-
 TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
   RetransmissionQueue queue = CreateQueue();
   EXPECT_CALL(producer_, Produce)
@@ -979,10 +911,10 @@
   EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
 
   // Mark the message as lost.
-  queue.HandleT3RtxTimerExpiry();
-
   EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
       .Times(1);
+  queue.HandleT3RtxTimerExpiry();
+
   EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
 
   EXPECT_THAT(queue.GetChunkStatesForTesting(),
@@ -1003,5 +935,248 @@
   EXPECT_EQ(queue.outstanding_bytes(), 0u);
 }
 
+TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
+  RetransmissionQueue queue = CreateQueue();
+  DataGeneratorOptions options;
+  options.stream_id = StreamID(17);
+  options.message_id = MID(42);
+  TimeMs test_start = now_;
+  EXPECT_CALL(producer_, Produce)
+      .WillOnce([&](TimeMs, size_t) {
+        SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", options));
+        dts.expires_at = TimeMs(test_start + DurationMs(10));
+        return dts;
+      })
+      .WillOnce([&](TimeMs, size_t) {
+        SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "", options));
+        dts.expires_at = TimeMs(test_start + DurationMs(10));
+        return dts;
+      })
+      .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+  std::vector<std::pair<TSN, Data>> chunks_to_send =
+      queue.GetChunksToSend(now_, 24);
+  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
+
+  EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(17), MID(42)))
+      .WillOnce(Return(true));
+  now_ += DurationMs(100);
+
+  EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty());
+
+  EXPECT_THAT(
+      queue.GetChunkStatesForTesting(),
+      ElementsAre(Pair(TSN(9), State::kAcked),         // Initial TSN
+                  Pair(TSN(10), State::kAbandoned),    // Produced
+                  Pair(TSN(11), State::kAbandoned),    // Produced and expired
+                  Pair(TSN(12), State::kAbandoned)));  // Placeholder end
+}
+
+TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) {
+  RetransmissionQueue queue = CreateQueue();
+  EXPECT_CALL(producer_, Produce)
+      .WillOnce([this](TimeMs, size_t) {
+        SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
+        dts.max_retransmissions = 0;
+        return dts;
+      })
+      .WillOnce(CreateChunk())
+      .WillOnce(CreateChunk())
+      .WillOnce(CreateChunk())
+      .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+  std::vector<std::pair<TSN, Data>> chunks_to_send =
+      queue.GetChunksToSend(now_, 1000);
+  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
+                                          Pair(TSN(12), _), Pair(TSN(13), _)));
+  EXPECT_THAT(queue.GetChunkStatesForTesting(),
+              ElementsAre(Pair(TSN(9), State::kAcked),      //
+                          Pair(TSN(10), State::kInFlight),  //
+                          Pair(TSN(11), State::kInFlight),  //
+                          Pair(TSN(12), State::kInFlight),  //
+                          Pair(TSN(13), State::kInFlight)));
+
+  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+  EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+      .Times(0);
+
+  queue.HandleSack(
+      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {}));
+
+  EXPECT_THAT(queue.GetChunkStatesForTesting(),
+              ElementsAre(Pair(TSN(9), State::kAcked),      //
+                          Pair(TSN(10), State::kNacked),    //
+                          Pair(TSN(11), State::kAcked),     //
+                          Pair(TSN(12), State::kInFlight),  //
+                          Pair(TSN(13), State::kInFlight)));
+
+  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+  queue.HandleSack(
+      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {}));
+
+  EXPECT_THAT(queue.GetChunkStatesForTesting(),
+              ElementsAre(Pair(TSN(9), State::kAcked),    //
+                          Pair(TSN(10), State::kNacked),  //
+                          Pair(TSN(11), State::kAcked),   //
+                          Pair(TSN(12), State::kAcked),   //
+                          Pair(TSN(13), State::kInFlight)));
+
+  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+  EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+      .WillOnce(Return(false));
+  queue.HandleSack(
+      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {}));
+
+  EXPECT_THAT(queue.GetChunkStatesForTesting(),
+              ElementsAre(Pair(TSN(9), State::kAcked),       //
+                          Pair(TSN(10), State::kAbandoned),  //
+                          Pair(TSN(11), State::kAcked),      //
+                          Pair(TSN(12), State::kAcked),      //
+                          Pair(TSN(13), State::kAcked)));
+
+  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+}
+
+TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) {
+  // This is a fairly long test.
+  RetransmissionQueue queue = CreateQueue();
+  EXPECT_CALL(producer_, Produce)
+      .WillOnce([this](TimeMs, size_t) {
+        SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
+        dts.max_retransmissions = 2;
+        return dts;
+      })
+      .WillOnce(CreateChunk())
+      .WillOnce(CreateChunk())
+      .WillOnce(CreateChunk())
+      .WillOnce(CreateChunk())
+      .WillOnce(CreateChunk())
+      .WillOnce(CreateChunk())
+      .WillOnce(CreateChunk())
+      .WillOnce(CreateChunk())
+      .WillOnce(CreateChunk())
+      .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+  std::vector<std::pair<TSN, Data>> chunks_to_send =
+      queue.GetChunksToSend(now_, 1000);
+  EXPECT_THAT(chunks_to_send,
+              ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _),
+                          Pair(TSN(13), _), Pair(TSN(14), _), Pair(TSN(15), _),
+                          Pair(TSN(16), _), Pair(TSN(17), _), Pair(TSN(18), _),
+                          Pair(TSN(19), _)));
+
+  EXPECT_THAT(queue.GetChunkStatesForTesting(),
+              ElementsAre(Pair(TSN(9), State::kAcked),      //
+                          Pair(TSN(10), State::kInFlight),  //
+                          Pair(TSN(11), State::kInFlight),  //
+                          Pair(TSN(12), State::kInFlight),  //
+                          Pair(TSN(13), State::kInFlight),  //
+                          Pair(TSN(14), State::kInFlight),  //
+                          Pair(TSN(15), State::kInFlight),  //
+                          Pair(TSN(16), State::kInFlight),  //
+                          Pair(TSN(17), State::kInFlight),  //
+                          Pair(TSN(18), State::kInFlight),  //
+                          Pair(TSN(19), State::kInFlight)));
+
+  EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+      .Times(0);
+
+  // Ack TSN [11 to 13] - three nacks for TSN(10), which will retransmit it.
+  for (int tsn = 11; tsn <= 13; ++tsn) {
+    queue.HandleSack(
+        now_,
+        SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
+  }
+
+  EXPECT_THAT(queue.GetChunkStatesForTesting(),
+              ElementsAre(Pair(TSN(9), State::kAcked),               //
+                          Pair(TSN(10), State::kToBeRetransmitted),  //
+                          Pair(TSN(11), State::kAcked),              //
+                          Pair(TSN(12), State::kAcked),              //
+                          Pair(TSN(13), State::kAcked),              //
+                          Pair(TSN(14), State::kInFlight),           //
+                          Pair(TSN(15), State::kInFlight),           //
+                          Pair(TSN(16), State::kInFlight),           //
+                          Pair(TSN(17), State::kInFlight),           //
+                          Pair(TSN(18), State::kInFlight),           //
+                          Pair(TSN(19), State::kInFlight)));
+
+  EXPECT_THAT(queue.GetChunksToSend(now_, 1000), ElementsAre(Pair(TSN(10), _)));
+
+  // Ack TSN [14 to 16] - three more nacks - second and last retransmission.
+  for (int tsn = 14; tsn <= 16; ++tsn) {
+    queue.HandleSack(
+        now_,
+        SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
+  }
+
+  EXPECT_THAT(queue.GetChunkStatesForTesting(),
+              ElementsAre(Pair(TSN(9), State::kAcked),               //
+                          Pair(TSN(10), State::kToBeRetransmitted),  //
+                          Pair(TSN(11), State::kAcked),              //
+                          Pair(TSN(12), State::kAcked),              //
+                          Pair(TSN(13), State::kAcked),              //
+                          Pair(TSN(14), State::kAcked),              //
+                          Pair(TSN(15), State::kAcked),              //
+                          Pair(TSN(16), State::kAcked),              //
+                          Pair(TSN(17), State::kInFlight),           //
+                          Pair(TSN(18), State::kInFlight),           //
+                          Pair(TSN(19), State::kInFlight)));
+
+  EXPECT_THAT(queue.GetChunksToSend(now_, 1000), ElementsAre(Pair(TSN(10), _)));
+
+  // Ack TSN [17 to 18]
+  for (int tsn = 17; tsn <= 18; ++tsn) {
+    queue.HandleSack(
+        now_,
+        SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
+  }
+
+  EXPECT_THAT(queue.GetChunkStatesForTesting(),
+              ElementsAre(Pair(TSN(9), State::kAcked),    //
+                          Pair(TSN(10), State::kNacked),  //
+                          Pair(TSN(11), State::kAcked),   //
+                          Pair(TSN(12), State::kAcked),   //
+                          Pair(TSN(13), State::kAcked),   //
+                          Pair(TSN(14), State::kAcked),   //
+                          Pair(TSN(15), State::kAcked),   //
+                          Pair(TSN(16), State::kAcked),   //
+                          Pair(TSN(17), State::kAcked),   //
+                          Pair(TSN(18), State::kAcked),   //
+                          Pair(TSN(19), State::kInFlight)));
+
+  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+  // Ack TSN 19 - three more nacks for TSN 10, no more retransmissions.
+  EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+      .WillOnce(Return(false));
+  queue.HandleSack(
+      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 10)}, {}));
+
+  EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());
+
+  EXPECT_THAT(queue.GetChunkStatesForTesting(),
+              ElementsAre(Pair(TSN(9), State::kAcked),       //
+                          Pair(TSN(10), State::kAbandoned),  //
+                          Pair(TSN(11), State::kAcked),      //
+                          Pair(TSN(12), State::kAcked),      //
+                          Pair(TSN(13), State::kAcked),      //
+                          Pair(TSN(14), State::kAcked),      //
+                          Pair(TSN(15), State::kAcked),      //
+                          Pair(TSN(16), State::kAcked),      //
+                          Pair(TSN(17), State::kAcked),      //
+                          Pair(TSN(18), State::kAcked),      //
+                          Pair(TSN(19), State::kAcked)));
+
+  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+}  // namespace
+
 }  // namespace
 }  // namespace dcsctp