blob: e02b111b5a2b0c2d1ba93975e9263ca6d6c34ec6 [file] [log] [blame]
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/retransmission_queue.h"
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <utility>
#include <vector>
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/packet/chunk/data_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/sack_chunk.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_options.h"
#include "net/dcsctp/testing/data_generator.h"
#include "net/dcsctp/timer/fake_timeout.h"
#include "net/dcsctp/timer/timer.h"
#include "net/dcsctp/tx/mock_send_queue.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/gunit.h"
#include "test/gmock.h"
namespace dcsctp {
namespace {
using ::testing::MockFunction;
using State = ::dcsctp::RetransmissionQueue::State;
using ::testing::_;
using ::testing::ElementsAre;
using ::testing::IsEmpty;
using ::testing::NiceMock;
using ::testing::Pair;
using ::testing::Return;
using ::testing::SizeIs;
using ::testing::UnorderedElementsAre;
constexpr uint32_t kArwnd = 100000;
constexpr uint32_t kMaxMtu = 1191;
class RetransmissionQueueTest : public testing::Test {
protected:
RetransmissionQueueTest()
: gen_(MID(42)),
timeout_manager_([this]() { return now_; }),
timer_manager_([this]() { return timeout_manager_.CreateTimeout(); }),
timer_(timer_manager_.CreateTimer(
"test/t3_rtx",
[]() { return absl::nullopt; },
TimerOptions(DurationMs(0)))) {}
std::function<SendQueue::DataToSend(TimeMs, size_t)> CreateChunk() {
return [this](TimeMs now, size_t max_size) {
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
};
}
std::vector<TSN> GetSentPacketTSNs(RetransmissionQueue& queue) {
std::vector<TSN> tsns;
for (const auto& elem : queue.GetChunksToSend(now_, 10000)) {
tsns.push_back(elem.first);
}
return tsns;
}
RetransmissionQueue CreateQueue(bool supports_partial_reliability = true,
bool use_message_interleaving = false) {
DcSctpOptions options;
options.mtu = kMaxMtu;
return RetransmissionQueue(
"", TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(),
on_clear_retransmission_counter_.AsStdFunction(), *timer_, options,
supports_partial_reliability, use_message_interleaving);
}
DataGenerator gen_;
TimeMs now_ = TimeMs(0);
FakeTimeoutManager timeout_manager_;
TimerManager timer_manager_;
NiceMock<MockFunction<void(DurationMs rtt_ms)>> on_rtt_;
NiceMock<MockFunction<void()>> on_clear_retransmission_counter_;
NiceMock<MockSendQueue> producer_;
std::unique_ptr<Timer> timer_;
};
TEST_F(RetransmissionQueueTest, InitialAckedPrevTsn) {
RetransmissionQueue queue = CreateQueue();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked)));
}
TEST_F(RetransmissionQueueTest, SendOneChunk) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
}
TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked)));
}
TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12)));
queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(11), State::kAcked), //
Pair(TSN(12), State::kInFlight)));
}
TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
TSN(15), TSN(16), TSN(17)));
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
{SackChunk::GapAckBlock(2, 3),
SackChunk::GapAckBlock(5, 5)},
{}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kNacked), //
Pair(TSN(14), State::kAcked), //
Pair(TSN(15), State::kAcked), //
Pair(TSN(16), State::kNacked), //
Pair(TSN(17), State::kAcked)));
}
TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
TSN(15), TSN(16), TSN(17)));
// Send more chunks, but leave some as gaps to force retransmission after
// three NACKs.
// Send 18
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18)));
// Ack 12, 14-15, 17-18
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
{SackChunk::GapAckBlock(2, 3),
SackChunk::GapAckBlock(5, 6)},
{}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kNacked), //
Pair(TSN(14), State::kAcked), //
Pair(TSN(15), State::kAcked), //
Pair(TSN(16), State::kNacked), //
Pair(TSN(17), State::kAcked), //
Pair(TSN(18), State::kAcked)));
// Send 19
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19)));
// Ack 12, 14-15, 17-19
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
{SackChunk::GapAckBlock(2, 3),
SackChunk::GapAckBlock(5, 7)},
{}));
// Send 20
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20)));
// Ack 12, 14-15, 17-20
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
{SackChunk::GapAckBlock(2, 3),
SackChunk::GapAckBlock(5, 8)},
{}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kToBeRetransmitted), //
Pair(TSN(14), State::kAcked), //
Pair(TSN(15), State::kAcked), //
Pair(TSN(16), State::kToBeRetransmitted), //
Pair(TSN(17), State::kAcked), //
Pair(TSN(18), State::kAcked), //
Pair(TSN(19), State::kAcked), //
Pair(TSN(20), State::kAcked)));
// This will trigger "fast retransmit" mode and only chunks 13 and 16 will be
// resent right now. The send queue will not even be queried.
EXPECT_CALL(producer_, Produce).Times(0);
EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13), TSN(16)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kInFlight), //
Pair(TSN(14), State::kAcked), //
Pair(TSN(15), State::kAcked), //
Pair(TSN(16), State::kInFlight), //
Pair(TSN(17), State::kAcked), //
Pair(TSN(18), State::kAcked), //
Pair(TSN(19), State::kAcked), //
Pair(TSN(20), State::kAcked)));
}
TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
})
.WillOnce([this](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight)));
}
TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered({1, 2, 3, 4}, "BE"));
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
// Will force chunks to be retransmitted
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted)));
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted)));
std::vector<std::pair<TSN, Data>> chunks_to_rtx =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
}
TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) {
RetransmissionQueue queue =
CreateQueue(/*supports_partial_reliability=*/false);
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = 0;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
// Will force chunks to be retransmitted
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(0);
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
} // namespace dcsctp
TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = 0;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
// Will force chunks to be retransmitted
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAbandoned)));
std::vector<std::pair<TSN, Data>> chunks_to_rtx =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_rtx, testing::IsEmpty());
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAbandoned)));
}
TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = 3;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(0);
// Retransmission 1
queue.HandleT3RtxTimerExpiry();
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
// Retransmission 2
queue.HandleT3RtxTimerExpiry();
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
// Retransmission 3
queue.HandleT3RtxTimerExpiry();
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));
// Retransmission 4 - not allowed.
queue.HandleT3RtxTimerExpiry();
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAbandoned)));
}
TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
RetransmissionQueue queue = CreateQueue();
static constexpr size_t kCwnd = 1200;
queue.set_cwnd(kCwnd);
EXPECT_EQ(queue.cwnd(), kCwnd);
EXPECT_EQ(queue.outstanding_bytes(), 0u);
std::vector<uint8_t> payload(1000);
EXPECT_CALL(producer_, Produce)
.WillOnce([this, payload](TimeMs, size_t) {
return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1500);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
// Will force chunks to be retransmitted
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted)));
EXPECT_EQ(queue.outstanding_bytes(), 0u);
std::vector<std::pair<TSN, Data>> chunks_to_rtx =
queue.GetChunksToSend(now_, 1500);
EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight)));
EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
}
TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
dts.max_retransmissions = 0;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
// Send and ack first chunk (TSN 10)
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
Pair(TSN(12), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight)));
// Chunk 10 is acked, but the remaining are lost
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kToBeRetransmitted), //
Pair(TSN(12), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(true));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
// NOTE: The TSN=13 represents the end fragment.
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned), //
Pair(TSN(13), State::kAbandoned)));
ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13));
EXPECT_THAT(forward_tsn.skipped_streams(),
UnorderedElementsAre(
ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42))));
}
TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "E"));
dts.max_retransmissions = 0;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
// Send and ack first chunk (TSN 10)
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
Pair(TSN(12), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight)));
// Chunk 10 is acked, but the remaining are lost
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kToBeRetransmitted), //
Pair(TSN(12), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
// NOTE: No additional TSN representing the end fragment, as that's TSN=12.
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned)));
ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12));
EXPECT_THAT(forward_tsn.skipped_streams(),
UnorderedElementsAre(
ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42))));
}
TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(1);
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", opts));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(2);
SendQueue::DataToSend dts(gen_.Unordered({1, 2, 3, 4}, "B", opts));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(3);
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "B", opts));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(4);
SendQueue::DataToSend dts(gen_.Ordered({13, 14, 15, 16}, "B", opts));
dts.max_retransmissions = 0;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
Pair(TSN(12), _), Pair(TSN(13), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight), //
Pair(TSN(13), State::kInFlight)));
// Chunk 13 is acked, but the remaining are lost
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(4, 4)}, {}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kNacked), //
Pair(TSN(11), State::kNacked), //
Pair(TSN(12), State::kNacked), //
Pair(TSN(13), State::kAcked)));
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted), //
Pair(TSN(11), State::kToBeRetransmitted), //
Pair(TSN(12), State::kToBeRetransmitted), //
Pair(TSN(13), State::kAcked)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(true));
EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42)))
.WillOnce(Return(true));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42)))
.WillOnce(Return(true));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAbandoned), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned), //
Pair(TSN(13), State::kAcked),
// Representing end fragments of stream 1-3
Pair(TSN(14), State::kAbandoned), //
Pair(TSN(15), State::kAbandoned), //
Pair(TSN(16), State::kAbandoned)));
IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn();
EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12));
EXPECT_THAT(
forward_tsn1.skipped_streams(),
UnorderedElementsAre(IForwardTsnChunk::SkippedStream(
IsUnordered(false), StreamID(1), MID(42)),
IForwardTsnChunk::SkippedStream(
IsUnordered(true), StreamID(2), MID(42)),
IForwardTsnChunk::SkippedStream(
IsUnordered(false), StreamID(3), MID(42))));
// When TSN 13 is acked, the placeholder "end fragments" must be skipped as
// well.
// A receiver is more likely to ack TSN 13, but do it incrementally.
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
EXPECT_CALL(producer_, Discard).Times(0);
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {}));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(13), State::kAcked), //
Pair(TSN(14), State::kAbandoned), //
Pair(TSN(15), State::kAbandoned), //
Pair(TSN(16), State::kAbandoned)));
IForwardTsnChunk forward_tsn2 = queue.CreateIForwardTsn();
EXPECT_EQ(forward_tsn2.new_cumulative_tsn(), TSN(16));
EXPECT_THAT(
forward_tsn2.skipped_streams(),
UnorderedElementsAre(IForwardTsnChunk::SkippedStream(
IsUnordered(false), StreamID(1), MID(42)),
IForwardTsnChunk::SkippedStream(
IsUnordered(true), StreamID(2), MID(42)),
IForwardTsnChunk::SkippedStream(
IsUnordered(false), StreamID(3), MID(42))));
}
TEST_F(RetransmissionQueueTest, MeasureRTT) {
RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
now_ = now_ + DurationMs(123);
EXPECT_CALL(on_rtt_, Call(DurationMs(123))).Times(1);
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
}
TEST_F(RetransmissionQueueTest, ValidateCumTsnAtRest) {
RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {})));
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {})));
EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})));
}
TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
TSN(15), TSN(16), TSN(17)));
EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {})));
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {})));
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})));
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})));
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})));
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {})));
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(14), kArwnd, {}, {})));
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(15), kArwnd, {}, {})));
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(16), kArwnd, {}, {})));
EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(17), kArwnd, {}, {})));
EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(18), kArwnd, {}, {})));
}
TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
TSN(15), TSN(16), TSN(17)));
// Ack 9, 20-25. This is an invalid SACK, but should still be handled.
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(11, 16)}, {}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight), //
Pair(TSN(13), State::kInFlight), //
Pair(TSN(14), State::kInFlight), //
Pair(TSN(15), State::kInFlight), //
Pair(TSN(16), State::kInFlight), //
Pair(TSN(17), State::kInFlight)));
}
TEST_F(RetransmissionQueueTest, HandleInvalidGapAckBlocks) {
RetransmissionQueue queue = CreateQueue();
// Nothing produced - nothing in retransmission queue
// Ack 9, 12-13
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(3, 4)}, {}));
// Gap ack blocks are just ignore.
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked)));
}
TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillOnce(CreateChunk())
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
EXPECT_THAT(GetSentPacketTSNs(queue),
testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
TSN(15), TSN(16), TSN(17)));
// Ack 9, 10-14. This is actually an invalid ACK as the first gap can't be
// adjacent to the cum-tsn-ack, but it's not strictly forbidden. However, the
// cum-tsn-ack should not move, as the gap-ack-blocks are just advisory.
queue.HandleSack(
now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(1, 5)}, {}));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kAcked), //
Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kAcked), //
Pair(TSN(14), State::kAcked), //
Pair(TSN(15), State::kInFlight), //
Pair(TSN(16), State::kInFlight), //
Pair(TSN(17), State::kInFlight)));
}
TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) {
RetransmissionQueue queue = CreateQueue();
// See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the
// magic numbers in this test.
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t size) {
EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize);
std::vector<uint8_t> payload(183);
return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
})
.WillOnce([this](TimeMs, size_t size) {
EXPECT_EQ(size, 976 - DataChunk::kHeaderSize);
std::vector<uint8_t> payload(957);
return SendQueue::DataToSend(gen_.Ordered(payload, "BE"));
});
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1188 - 12);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));
}
TEST_F(RetransmissionQueueTest, AccountsInflightAbandonedChunksAsOutstanding) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
dts.max_retransmissions = 0;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
// Send and ack first chunk (TSN 10)
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
Pair(TSN(12), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight)));
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
// Discard the message while it was outstanding.
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAbandoned), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned)));
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
// Now ACK those, one at a time.
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 2u);
queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 1u);
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), 0u);
}
TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
dts.max_retransmissions = 0;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
// Send and ack first chunk (TSN 10)
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
Pair(TSN(12), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight)));
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
// Mark the message as lost.
queue.HandleT3RtxTimerExpiry();
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAbandoned), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned)));
EXPECT_EQ(queue.outstanding_bytes(), 0u);
// Now ACK those, one at a time.
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), 0u);
queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), 0u);
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), 0u);
}
} // namespace
} // namespace dcsctp