dcsctp: Abandon chunks consistently
The previous logic to abandon chunks when partial reliability was used
was a bit too eager and trigger happy.
* Chunks with limited retransmissions should only be abandoned when a
chunk is really considered lost. It should follow the same rules as
for retransmitting chunks - that it must be nacked three times or
due to a T3-RTX expiration. Before this change, a single SACK not
referencing it would be enough to abandon it. This resulted in a lot
of unnecessary sent FORWARD-TSN and undelivered messages - especially
if running with zero retransmissions.
The logic to expire chunks by limited retransmissions will now only
be applied when a chunk is actually nacked.
* The second partial reliability trigger - expiration time - wasn't
evaluated when producing a middle chunk of a larger message.
A number of test cases were added and updated as chunks will now be
abandoned immediately instead of first scheduled for retransmission and
later abandoned.
Bug: webrtc:12961
Change-Id: I0ae17b2672568bdbdc32073a99d4c24b09ff5fe9
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/225548
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34458}
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc
index 4ab4249..7ca3d9b 100644
--- a/net/dcsctp/socket/dcsctp_socket_test.cc
+++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -91,6 +91,33 @@
return true;
}
+MATCHER_P(HasDataChunkWithPPID, ppid, "") {
+ absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
+ if (!packet.has_value()) {
+ *result_listener << "data didn't parse as an SctpPacket";
+ return false;
+ }
+
+ if (packet->descriptors()[0].type != DataChunk::kType) {
+ *result_listener << "the first chunk in the packet is not a data chunk";
+ return false;
+ }
+
+ absl::optional<DataChunk> dc =
+ DataChunk::Parse(packet->descriptors()[0].data);
+ if (!dc.has_value()) {
+ *result_listener << "The first chunk didn't parse as a data chunk";
+ return false;
+ }
+
+ if (dc->ppid() != ppid) {
+ *result_listener << "the ppid is " << *dc->ppid();
+ return false;
+ }
+
+ return true;
+}
+
MATCHER_P(HasDataChunkWithSsn, ssn, "") {
absl::optional<SctpPacket> packet = SctpPacket::Parse(arg);
if (!packet.has_value()) {
@@ -1049,7 +1076,14 @@
// Third DATA
sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());
- // Handle SACK
+ // Handle SACK for first DATA
+ sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
+
+ // Handle delayed SACK for third DATA
+ AdvanceTime(options_.delayed_ack_max_timeout);
+ RunTimers();
+
+ // Handle SACK for second DATA
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
// Now the missing data chunk will be marked as nacked, but it might still be
@@ -1065,11 +1099,7 @@
// FORWARD-TSN (third)
sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());
- // The receiver might have moved into delayed ack mode.
- AdvanceTime(options_.rto_initial);
- RunTimers();
-
- // Handle SACK
+ // Which will trigger a SACK
sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());
absl::optional<DcSctpMessage> msg1 = cb_z_.ConsumeReceivedMessage();
@@ -1084,6 +1114,78 @@
EXPECT_FALSE(msg3.has_value());
}
+TEST_F(DcSctpSocketTest, SendManyFragmentedMessagesWithLimitedRtx) {
+ ConnectSockets();
+
+ SendOptions send_options;
+ send_options.unordered = IsUnordered(true);
+ send_options.max_retransmissions = 0;
+ std::vector<uint8_t> payload(options_.mtu * 2 - 100 /* margin */);
+ // Sending first message
+ sock_a_.Send(DcSctpMessage(StreamID(1), PPID(51), payload), send_options);
+ // Sending second message
+ sock_a_.Send(DcSctpMessage(StreamID(1), PPID(52), payload), send_options);
+ // Sending third message
+ sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), payload), send_options);
+ // Sending fourth message
+ sock_a_.Send(DcSctpMessage(StreamID(1), PPID(54), payload), send_options);
+
+ // First DATA, first fragment
+ std::vector<uint8_t> packet = cb_a_.ConsumeSentPacket();
+ EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(51)));
+ sock_z_.ReceivePacket(std::move(packet));
+
+ // First DATA, second fragment (lost)
+ packet = cb_a_.ConsumeSentPacket();
+ EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(51)));
+
+ // Second DATA, first fragment
+ packet = cb_a_.ConsumeSentPacket();
+ EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(52)));
+ sock_z_.ReceivePacket(std::move(packet));
+
+ // Second DATA, second fragment (lost)
+ packet = cb_a_.ConsumeSentPacket();
+ EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(52)));
+ EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
+
+ // Third DATA, first fragment
+ packet = cb_a_.ConsumeSentPacket();
+ EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(53)));
+ EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
+ sock_z_.ReceivePacket(std::move(packet));
+
+ // Third DATA, second fragment (lost)
+ packet = cb_a_.ConsumeSentPacket();
+ EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(53)));
+ EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
+
+ // Fourth DATA, first fragment
+ packet = cb_a_.ConsumeSentPacket();
+ EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(54)));
+ EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
+ sock_z_.ReceivePacket(std::move(packet));
+
+ // Fourth DATA, second fragment
+ packet = cb_a_.ConsumeSentPacket();
+ EXPECT_THAT(packet, HasDataChunkWithPPID(PPID(54)));
+ EXPECT_THAT(packet, HasDataChunkWithSsn(SSN(0)));
+ sock_z_.ReceivePacket(std::move(packet));
+
+ ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
+
+ // Let the RTX timer expire, and exchange FORWARD-TSN/SACKs
+ AdvanceTime(options_.rto_initial);
+ RunTimers();
+ ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
+
+ absl::optional<DcSctpMessage> msg1 = cb_z_.ConsumeReceivedMessage();
+ ASSERT_TRUE(msg1.has_value());
+ EXPECT_EQ(msg1->ppid(), PPID(54));
+
+ ASSERT_FALSE(cb_z_.ConsumeReceivedMessage().has_value());
+}
+
struct FakeChunkConfig : ChunkConfig {
static constexpr int kType = 0x49;
static constexpr size_t kHeaderSize = 4;
diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc
index ef2f0e3..51bb65a 100644
--- a/net/dcsctp/tx/retransmission_queue.cc
+++ b/net/dcsctp/tx/retransmission_queue.cc
@@ -188,16 +188,8 @@
for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked);
iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) {
if (iter->first <= max_tsn_to_nack) {
- if (iter->second.is_outstanding()) {
- outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data());
- }
-
- if (iter->second.Nack()) {
- ack_info.has_packet_loss = true;
- to_be_retransmitted_.insert(iter->first);
- RTC_DLOG(LS_VERBOSE) << log_prefix_ << *iter->first.Wrap()
- << " marked for retransmission";
- }
+ ack_info.has_packet_loss =
+ NackItem(iter->first, iter->second, /*retransmit_now=*/false);
}
}
prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
@@ -499,20 +491,11 @@
// T3-rtx timer expired but did not fit in one MTU (rule E3 above) should be
// marked for retransmission and sent as soon as cwnd allows (normally, when a
// SACK arrives)."
- int count = 0;
for (auto& elem : outstanding_data_) {
UnwrappedTSN tsn = elem.first;
TxData& item = elem.second;
if (!item.is_acked()) {
- if (item.is_outstanding()) {
- outstanding_bytes_ -= GetSerializedChunkSize(item.data());
- }
- if (item.Nack(/*retransmit_now=*/true)) {
- to_be_retransmitted_.insert(tsn);
- RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Chunk " << *tsn.Wrap()
- << " will be retransmitted due to T3-RTX";
- ++count;
- }
+ NackItem(tsn, item, /*retransmit_now=*/true);
}
}
@@ -524,12 +507,33 @@
RTC_DLOG(LS_INFO) << log_prefix_ << "t3-rtx expired. new cwnd=" << cwnd_
<< " (" << old_cwnd << "), ssthresh=" << ssthresh_
- << ", rtx-packets=" << count << ", outstanding_bytes "
- << outstanding_bytes_ << " (" << old_outstanding_bytes
- << ")";
+ << ", outstanding_bytes " << outstanding_bytes_ << " ("
+ << old_outstanding_bytes << ")";
RTC_DCHECK(IsConsistent());
}
+bool RetransmissionQueue::NackItem(UnwrappedTSN tsn,
+ TxData& item,
+ bool retransmit_now) {
+ if (item.is_outstanding()) {
+ outstanding_bytes_ -= GetSerializedChunkSize(item.data());
+ }
+
+ switch (item.Nack(retransmit_now)) {
+ case TxData::NackAction::kNothing:
+ return false;
+ case TxData::NackAction::kRetransmit:
+ to_be_retransmitted_.insert(tsn);
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << *tsn.Wrap()
+ << " marked for retransmission";
+ break;
+ case TxData::NackAction::kAbandon:
+ AbandonAllFor(item);
+ break;
+ }
+ return true;
+}
+
std::vector<std::pair<TSN, Data>>
RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) {
std::vector<std::pair<TSN, Data>> result;
@@ -615,17 +619,35 @@
UnwrappedTSN tsn = next_tsn_;
next_tsn_.Increment();
- to_be_sent.emplace_back(tsn.Wrap(), chunk_opt->data.Clone());
// All chunks are always padded to be even divisible by 4.
size_t chunk_size = GetSerializedChunkSize(chunk_opt->data);
max_bytes -= chunk_size;
outstanding_bytes_ += chunk_size;
rwnd_ -= chunk_size;
- outstanding_data_.emplace(
- tsn, RetransmissionQueue::TxData(std::move(chunk_opt->data),
- chunk_opt->max_retransmissions, now,
- chunk_opt->expires_at));
+ auto item_it =
+ outstanding_data_
+ .emplace(tsn,
+ RetransmissionQueue::TxData(
+ chunk_opt->data.Clone(),
+ partial_reliability_ ? chunk_opt->max_retransmissions
+ : absl::nullopt,
+ now,
+ partial_reliability_ ? chunk_opt->expires_at
+ : absl::nullopt))
+ .first;
+
+ if (item_it->second.has_expired(now)) {
+ // No need to send it - it was expired when it was in the send
+ // queue.
+ RTC_DLOG(LS_VERBOSE)
+ << log_prefix_ << "Marking freshly produced chunk "
+ << *item_it->first.Wrap() << " and message "
+ << *item_it->second.data().message_id << " as expired";
+ AbandonAllFor(item_it->second);
+ } else {
+ to_be_sent.emplace_back(tsn.Wrap(), std::move(chunk_opt->data));
+ }
}
}
@@ -684,7 +706,7 @@
if (!partial_reliability_) {
return false;
}
- ExpireChunks(now);
+ ExpireOutstandingChunks(now);
if (!outstanding_data_.empty()) {
auto it = outstanding_data_.begin();
return it->first == last_cumulative_tsn_ack_.next_value() &&
@@ -699,15 +721,22 @@
should_be_retransmitted_ = false;
}
-bool RetransmissionQueue::TxData::Nack(bool retransmit_now) {
+RetransmissionQueue::TxData::NackAction RetransmissionQueue::TxData::Nack(
+ bool retransmit_now) {
ack_state_ = AckState::kNacked;
++nack_count_;
if ((retransmit_now || nack_count_ >= kNumberOfNacksForRetransmission) &&
!is_abandoned_) {
- should_be_retransmitted_ = true;
- return true;
+ // Nacked enough times - it's considered lost.
+ if (!max_retransmissions_.has_value() ||
+ num_retransmissions_ < max_retransmissions_) {
+ should_be_retransmitted_ = true;
+ return NackAction::kRetransmit;
+ }
+ Abandon();
+ return NackAction::kAbandon;
}
- return false;
+ return NackAction::kNothing;
}
void RetransmissionQueue::TxData::Retransmit() {
@@ -724,33 +753,24 @@
}
bool RetransmissionQueue::TxData::has_expired(TimeMs now) const {
- if (ack_state_ != AckState::kAcked && !is_abandoned_) {
- if (max_retransmissions_.has_value() &&
- num_retransmissions_ >= *max_retransmissions_) {
- return true;
- } else if (expires_at_.has_value() && *expires_at_ <= now) {
- return true;
- }
- }
- return false;
+ return expires_at_.has_value() && *expires_at_ <= now;
}
-void RetransmissionQueue::ExpireChunks(TimeMs now) {
+void RetransmissionQueue::ExpireOutstandingChunks(TimeMs now) {
for (const auto& elem : outstanding_data_) {
UnwrappedTSN tsn = elem.first;
const TxData& item = elem.second;
- // Chunks that are in-flight (possibly lost?), nacked or to be retransmitted
- // can be expired easily. There is always a risk that a message is expired
- // that was already received by the peer, but for which there haven't been
- // a SACK received. But that's acceptable, and handled.
+ // Chunks that are nacked can be expired. Care should be taken not to expire
+ // unacked (in-flight) chunks as they might have been received, but the SACK
+ // is either delayed or in-flight and may be received later.
if (item.is_abandoned()) {
// Already abandoned.
- } else if (item.has_expired(now)) {
- RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap()
- << " and message " << *item.data().message_id
- << " as expired";
- ExpireAllFor(item);
+ } else if (item.is_nacked() && item.has_expired(now)) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking nacked chunk "
+ << *tsn.Wrap() << " and message "
+ << *item.data().message_id << " as expired";
+ AbandonAllFor(item);
} else {
// A non-expired chunk. No need to iterate any further.
break;
@@ -758,7 +778,7 @@
}
}
-void RetransmissionQueue::ExpireAllFor(
+void RetransmissionQueue::AbandonAllFor(
const RetransmissionQueue::TxData& item) {
// Erase all remaining chunks from the producer, if any.
if (send_queue_.Discard(item.data().is_unordered, item.data().stream_id,
diff --git a/net/dcsctp/tx/retransmission_queue.h b/net/dcsctp/tx/retransmission_queue.h
index 7f5baf9..c5a6a04 100644
--- a/net/dcsctp/tx/retransmission_queue.h
+++ b/net/dcsctp/tx/retransmission_queue.h
@@ -143,6 +143,12 @@
// its associated metadata.
class TxData {
public:
+ enum class NackAction {
+ kNothing,
+ kRetransmit,
+ kAbandon,
+ };
+
explicit TxData(Data data,
absl::optional<size_t> max_retransmissions,
TimeMs time_sent,
@@ -160,9 +166,10 @@
void Ack();
// Nacks an item. If it has been nacked enough times, or if `retransmit_now`
- // is set, it might be marked for retransmission, which is indicated by the
- // return value.
- bool Nack(bool retransmit_now = false);
+ // is set, it might be marked for retransmission. If the item has reached
+ // its max retransmission value, it will instead be abandoned. The action
+ // performed is indicated as return value.
+ NackAction Nack(bool retransmit_now = false);
// Prepares the item to be retransmitted. Sets it as outstanding and
// clears all nack counters.
@@ -173,6 +180,7 @@
bool is_outstanding() const { return ack_state_ == AckState::kUnacked; }
bool is_acked() const { return ack_state_ == AckState::kAcked; }
+ bool is_nacked() const { return ack_state_ == AckState::kNacked; }
bool is_abandoned() const { return is_abandoned_; }
// Indicates if this chunk should be retransmitted.
@@ -264,6 +272,14 @@
// by setting `bytes_acked_by_cumulative_tsn_ack` and `acked_tsns`.
void RemoveAcked(UnwrappedTSN cumulative_tsn_ack, AckInfo& ack_info);
+ // Helper method to nack an item and perform the correct operations given the
+ // action indicated when nacking an item (e.g. retransmitting or abandoning).
+ // The return value indicate if an action was performed, meaning that packet
+ // loss was detected and acted upon.
+ bool NackItem(UnwrappedTSN cumulative_tsn_ack,
+ TxData& item,
+ bool retransmit_now);
+
// Will mark the chunks covered by the `gap_ack_blocks` from an incoming SACK
// as "acked" and update `ack_info` by adding new TSNs to `added_tsns`.
void AckGapBlocks(UnwrappedTSN cumulative_tsn_ack,
@@ -307,13 +323,13 @@
// is running.
void StartT3RtxTimerIfOutstandingData();
- // Given the current time `now_ms`, expire chunks that have a limited
- // lifetime.
- void ExpireChunks(TimeMs now);
- // Given that a message fragment, `item` has expired, expire all other
- // fragments that share the same message - even never-before-sent fragments
- // that are still in the SendQueue.
- void ExpireAllFor(const RetransmissionQueue::TxData& item);
+ // Given the current time `now_ms`, expire and abandon outstanding (sent at
+ // least once) chunks that have a limited lifetime.
+ void ExpireOutstandingChunks(TimeMs now);
+ // Given that a message fragment, `item` has been abandoned, abandon all other
+ // fragments that share the same message - both never-before-sent fragments
+ // that are still in the SendQueue and outstanding chunks.
+ void AbandonAllFor(const RetransmissionQueue::TxData& item);
// Returns the current congestion control algorithm phase.
CongestionAlgorithmPhase phase() const {
diff --git a/net/dcsctp/tx/retransmission_queue_test.cc b/net/dcsctp/tx/retransmission_queue_test.cc
index e02b111..4aa76d6 100644
--- a/net/dcsctp/tx/retransmission_queue_test.cc
+++ b/net/dcsctp/tx/retransmission_queue_test.cc
@@ -378,14 +378,14 @@
Pair(TSN(10), State::kInFlight)));
// Will force chunks to be retransmitted
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .Times(1);
+
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
- Pair(TSN(10), State::kToBeRetransmitted)));
-
- EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
- .Times(1);
+ Pair(TSN(10), State::kAbandoned)));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
@@ -438,9 +438,9 @@
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
// Retransmission 4 - not allowed.
- queue.HandleT3RtxTimerExpiry();
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
+ queue.HandleT3RtxTimerExpiry();
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());
@@ -521,16 +521,11 @@
// Chunk 10 is acked, but the remaining are lost
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
- queue.HandleT3RtxTimerExpiry();
-
- EXPECT_THAT(queue.GetChunkStatesForTesting(),
- ElementsAre(Pair(TSN(10), State::kAcked), //
- Pair(TSN(11), State::kToBeRetransmitted), //
- Pair(TSN(12), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(true));
- EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+ queue.HandleT3RtxTimerExpiry();
// NOTE: The TSN=13 represents the end fragment.
EXPECT_THAT(queue.GetChunkStatesForTesting(),
@@ -539,6 +534,8 @@
Pair(TSN(12), State::kAbandoned), //
Pair(TSN(13), State::kAbandoned)));
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13));
EXPECT_THAT(forward_tsn.skipped_streams(),
@@ -579,23 +576,19 @@
// Chunk 10 is acked, but the remaining are lost
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
- queue.HandleT3RtxTimerExpiry();
-
- EXPECT_THAT(queue.GetChunkStatesForTesting(),
- ElementsAre(Pair(TSN(10), State::kAcked), //
- Pair(TSN(11), State::kToBeRetransmitted), //
- Pair(TSN(12), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
- EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
- // NOTE: No additional TSN representing the end fragment, as that's TSN=12.
+ queue.HandleT3RtxTimerExpiry();
+
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned)));
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12));
EXPECT_THAT(forward_tsn.skipped_streams(),
@@ -657,22 +650,14 @@
Pair(TSN(12), State::kNacked), //
Pair(TSN(13), State::kAcked)));
- queue.HandleT3RtxTimerExpiry();
-
- EXPECT_THAT(queue.GetChunkStatesForTesting(),
- ElementsAre(Pair(TSN(9), State::kAcked), //
- Pair(TSN(10), State::kToBeRetransmitted), //
- Pair(TSN(11), State::kToBeRetransmitted), //
- Pair(TSN(12), State::kToBeRetransmitted), //
- Pair(TSN(13), State::kAcked)));
-
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(true));
EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42)))
.WillOnce(Return(true));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42)))
.WillOnce(Return(true));
- EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
+ queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
@@ -685,6 +670,8 @@
Pair(TSN(15), State::kAbandoned), //
Pair(TSN(16), State::kAbandoned)));
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+
IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn();
EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12));
EXPECT_THAT(
@@ -891,61 +878,6 @@
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));
}
-TEST_F(RetransmissionQueueTest, AccountsInflightAbandonedChunksAsOutstanding) {
- RetransmissionQueue queue = CreateQueue();
- EXPECT_CALL(producer_, Produce)
- .WillOnce([this](TimeMs, size_t) {
- SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
- dts.max_retransmissions = 0;
- return dts;
- })
- .WillOnce([this](TimeMs, size_t) {
- SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
- dts.max_retransmissions = 0;
- return dts;
- })
- .WillOnce([this](TimeMs, size_t) {
- SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
- dts.max_retransmissions = 0;
- return dts;
- })
- .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
-
- // Send and ack first chunk (TSN 10)
- std::vector<std::pair<TSN, Data>> chunks_to_send =
- queue.GetChunksToSend(now_, 1000);
- EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
- Pair(TSN(12), _)));
- EXPECT_THAT(queue.GetChunkStatesForTesting(),
- ElementsAre(Pair(TSN(9), State::kAcked), //
- Pair(TSN(10), State::kInFlight), //
- Pair(TSN(11), State::kInFlight), //
- Pair(TSN(12), State::kInFlight)));
- EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
-
- // Discard the message while it was outstanding.
- EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
- .Times(1);
- EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
-
- EXPECT_THAT(queue.GetChunkStatesForTesting(),
- ElementsAre(Pair(TSN(9), State::kAcked), //
- Pair(TSN(10), State::kAbandoned), //
- Pair(TSN(11), State::kAbandoned), //
- Pair(TSN(12), State::kAbandoned)));
- EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
-
- // Now ACK those, one at a time.
- queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
- EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 2u);
-
- queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
- EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 1u);
-
- queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
- EXPECT_EQ(queue.outstanding_bytes(), 0u);
-}
-
TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
@@ -979,10 +911,10 @@
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
// Mark the message as lost.
- queue.HandleT3RtxTimerExpiry();
-
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
+ queue.HandleT3RtxTimerExpiry();
+
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
@@ -1003,5 +935,248 @@
EXPECT_EQ(queue.outstanding_bytes(), 0u);
}
+TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
+ RetransmissionQueue queue = CreateQueue();
+ DataGeneratorOptions options;
+ options.stream_id = StreamID(17);
+ options.message_id = MID(42);
+ TimeMs test_start = now_;
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([&](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", options));
+ dts.expires_at = TimeMs(test_start + DurationMs(10));
+ return dts;
+ })
+ .WillOnce([&](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "", options));
+ dts.expires_at = TimeMs(test_start + DurationMs(10));
+ return dts;
+ })
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 24);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(17), MID(42)))
+ .WillOnce(Return(true));
+ now_ += DurationMs(100);
+
+ EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty());
+
+ EXPECT_THAT(
+ queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), // Initial TSN
+ Pair(TSN(10), State::kAbandoned), // Produced
+ Pair(TSN(11), State::kAbandoned), // Produced and expired
+ Pair(TSN(12), State::kAbandoned))); // Placeholder end
+}
+
+TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) {
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
+ dts.max_retransmissions = 0;
+ return dts;
+ })
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
+ Pair(TSN(12), _), Pair(TSN(13), _)));
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight), //
+ Pair(TSN(11), State::kInFlight), //
+ Pair(TSN(12), State::kInFlight), //
+ Pair(TSN(13), State::kInFlight)));
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .Times(0);
+
+ queue.HandleSack(
+ now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {}));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kNacked), //
+ Pair(TSN(11), State::kAcked), //
+ Pair(TSN(12), State::kInFlight), //
+ Pair(TSN(13), State::kInFlight)));
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+ queue.HandleSack(
+ now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {}));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kNacked), //
+ Pair(TSN(11), State::kAcked), //
+ Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kInFlight)));
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .WillOnce(Return(false));
+ queue.HandleSack(
+ now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {}));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kAbandoned), //
+ Pair(TSN(11), State::kAcked), //
+ Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kAcked)));
+
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+}
+
+TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) {
+ // This is a fairly long test.
+ RetransmissionQueue queue = CreateQueue();
+ EXPECT_CALL(producer_, Produce)
+ .WillOnce([this](TimeMs, size_t) {
+ SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
+ dts.max_retransmissions = 2;
+ return dts;
+ })
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillOnce(CreateChunk())
+ .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+ std::vector<std::pair<TSN, Data>> chunks_to_send =
+ queue.GetChunksToSend(now_, 1000);
+ EXPECT_THAT(chunks_to_send,
+ ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _),
+ Pair(TSN(13), _), Pair(TSN(14), _), Pair(TSN(15), _),
+ Pair(TSN(16), _), Pair(TSN(17), _), Pair(TSN(18), _),
+ Pair(TSN(19), _)));
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kInFlight), //
+ Pair(TSN(11), State::kInFlight), //
+ Pair(TSN(12), State::kInFlight), //
+ Pair(TSN(13), State::kInFlight), //
+ Pair(TSN(14), State::kInFlight), //
+ Pair(TSN(15), State::kInFlight), //
+ Pair(TSN(16), State::kInFlight), //
+ Pair(TSN(17), State::kInFlight), //
+ Pair(TSN(18), State::kInFlight), //
+ Pair(TSN(19), State::kInFlight)));
+
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .Times(0);
+
+ // Ack TSN [11 to 13] - three nacks for TSN(10), which will retransmit it.
+ for (int tsn = 11; tsn <= 13; ++tsn) {
+ queue.HandleSack(
+ now_,
+ SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
+ }
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kToBeRetransmitted), //
+ Pair(TSN(11), State::kAcked), //
+ Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kAcked), //
+ Pair(TSN(14), State::kInFlight), //
+ Pair(TSN(15), State::kInFlight), //
+ Pair(TSN(16), State::kInFlight), //
+ Pair(TSN(17), State::kInFlight), //
+ Pair(TSN(18), State::kInFlight), //
+ Pair(TSN(19), State::kInFlight)));
+
+ EXPECT_THAT(queue.GetChunksToSend(now_, 1000), ElementsAre(Pair(TSN(10), _)));
+
+ // Ack TSN [14 to 16] - three more nacks - second and last retransmission.
+ for (int tsn = 14; tsn <= 16; ++tsn) {
+ queue.HandleSack(
+ now_,
+ SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
+ }
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kToBeRetransmitted), //
+ Pair(TSN(11), State::kAcked), //
+ Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kAcked), //
+ Pair(TSN(14), State::kAcked), //
+ Pair(TSN(15), State::kAcked), //
+ Pair(TSN(16), State::kAcked), //
+ Pair(TSN(17), State::kInFlight), //
+ Pair(TSN(18), State::kInFlight), //
+ Pair(TSN(19), State::kInFlight)));
+
+ EXPECT_THAT(queue.GetChunksToSend(now_, 1000), ElementsAre(Pair(TSN(10), _)));
+
+ // Ack TSN [17 to 18]
+ for (int tsn = 17; tsn <= 18; ++tsn) {
+ queue.HandleSack(
+ now_,
+ SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
+ }
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kNacked), //
+ Pair(TSN(11), State::kAcked), //
+ Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kAcked), //
+ Pair(TSN(14), State::kAcked), //
+ Pair(TSN(15), State::kAcked), //
+ Pair(TSN(16), State::kAcked), //
+ Pair(TSN(17), State::kAcked), //
+ Pair(TSN(18), State::kAcked), //
+ Pair(TSN(19), State::kInFlight)));
+
+ EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
+
+ // Ack TSN 19 - three more nacks for TSN 10, no more retransmissions.
+ EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
+ .WillOnce(Return(false));
+ queue.HandleSack(
+ now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 10)}, {}));
+
+ EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());
+
+ EXPECT_THAT(queue.GetChunkStatesForTesting(),
+ ElementsAre(Pair(TSN(9), State::kAcked), //
+ Pair(TSN(10), State::kAbandoned), //
+ Pair(TSN(11), State::kAcked), //
+ Pair(TSN(12), State::kAcked), //
+ Pair(TSN(13), State::kAcked), //
+ Pair(TSN(14), State::kAcked), //
+ Pair(TSN(15), State::kAcked), //
+ Pair(TSN(16), State::kAcked), //
+ Pair(TSN(17), State::kAcked), //
+ Pair(TSN(18), State::kAcked), //
+ Pair(TSN(19), State::kAcked)));
+
+ EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
+} // namespace
+
} // namespace
} // namespace dcsctp