/*
 *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */
#include "net/dcsctp/tx/retransmission_queue.h"

#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <utility>
#include <vector>

#include "absl/types/optional.h"
#include "api/array_view.h"
#include "api/task_queue/task_queue_base.h"
#include "net/dcsctp/common/handover_testing.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/common/math.h"
#include "net/dcsctp/packet/chunk/data_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/sack_chunk.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_options.h"
#include "net/dcsctp/socket/mock_dcsctp_socket_callbacks.h"
#include "net/dcsctp/testing/data_generator.h"
#include "net/dcsctp/testing/testing_macros.h"
#include "net/dcsctp/timer/fake_timeout.h"
#include "net/dcsctp/timer/timer.h"
#include "net/dcsctp/tx/mock_send_queue.h"
#include "net/dcsctp/tx/send_queue.h"
#include "rtc_base/gunit.h"
#include "test/gmock.h"

namespace dcsctp {
namespace {
using ::testing::MockFunction;
using State = ::dcsctp::RetransmissionQueue::State;
using ::testing::_;
using ::testing::ElementsAre;
using ::testing::Field;
using ::testing::IsEmpty;
using ::testing::NiceMock;
using ::testing::Pair;
using ::testing::Return;
using ::testing::SizeIs;
using ::testing::UnorderedElementsAre;
using ::webrtc::TimeDelta;
using ::webrtc::Timestamp;

constexpr uint32_t kArwnd = 100000;
constexpr uint32_t kMaxMtu = 1191;
constexpr OutgoingMessageId kMessageId = OutgoingMessageId(42);

DcSctpOptions MakeOptions() {
  DcSctpOptions options;
  options.mtu = kMaxMtu;
  return options;
}

class RetransmissionQueueTest : public testing::Test {
 protected:
  RetransmissionQueueTest()
      : options_(MakeOptions()),
        gen_(MID(42)),
        timeout_manager_([this]() { return now_; }),
        timer_manager_([this](webrtc::TaskQueueBase::DelayPrecision precision) {
          return timeout_manager_.CreateTimeout(precision);
        }),
        timer_(timer_manager_.CreateTimer(
            "test/t3_rtx",
            []() { return TimeDelta::Zero(); },
            TimerOptions(options_.rto_initial.ToTimeDelta()))) {}

  std::function<SendQueue::DataToSend(Timestamp, size_t)> CreateChunk(
      OutgoingMessageId message_id) {
    return [this, message_id](Timestamp now, size_t max_size) {
      return SendQueue::DataToSend(message_id,
                                   gen_.Ordered({1, 2, 3, 4}, "BE"));
    };
  }

  std::vector<TSN> GetTSNsForFastRetransmit(RetransmissionQueue& queue) {
    std::vector<TSN> tsns;
    for (const auto& elem : queue.GetChunksForFastRetransmit(10000)) {
      tsns.push_back(elem.first);
    }
    return tsns;
  }

  std::vector<TSN> GetSentPacketTSNs(RetransmissionQueue& queue) {
    std::vector<TSN> tsns;
    for (const auto& elem : queue.GetChunksToSend(now_, 10000)) {
      tsns.push_back(elem.first);
    }
    return tsns;
  }

  RetransmissionQueue CreateQueue(bool supports_partial_reliability = true,
                                  bool use_message_interleaving = false) {
    return RetransmissionQueue(
        "", &callbacks_, TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(),
        on_clear_retransmission_counter_.AsStdFunction(), *timer_, options_,
        supports_partial_reliability, use_message_interleaving);
  }

  std::unique_ptr<RetransmissionQueue> CreateQueueByHandover(
      RetransmissionQueue& queue) {
    EXPECT_EQ(queue.GetHandoverReadiness(), HandoverReadinessStatus());
    DcSctpSocketHandoverState state;
    queue.AddHandoverState(state);
    g_handover_state_transformer_for_test(&state);
    auto queue2 = std::make_unique<RetransmissionQueue>(
        "", &callbacks_, TSN(10), kArwnd, producer_, on_rtt_.AsStdFunction(),
        on_clear_retransmission_counter_.AsStdFunction(), *timer_, options_,
        /*supports_partial_reliability=*/true,
        /*use_message_interleaving=*/false);
    queue2->RestoreFromState(state);
    return queue2;
  }

  MockDcSctpSocketCallbacks callbacks_;
  DcSctpOptions options_;
  DataGenerator gen_;
  Timestamp now_ = Timestamp::Zero();
  FakeTimeoutManager timeout_manager_;
  TimerManager timer_manager_;
  NiceMock<MockFunction<void(TimeDelta rtt_ms)>> on_rtt_;
  NiceMock<MockFunction<void()>> on_clear_retransmission_counter_;
  NiceMock<MockSendQueue> producer_;
  std::unique_ptr<Timer> timer_;
};

TEST_F(RetransmissionQueueTest, InitialAckedPrevTsn) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked)));
}

TEST_F(RetransmissionQueueTest, SendOneChunk) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kInFlight)));
}

TEST_F(RetransmissionQueueTest, SendOneChunkAndAck) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(10)));

  queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(10), State::kAcked)));
}

TEST_F(RetransmissionQueueTest, SendThreeChunksAndAckTwo) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_THAT(GetSentPacketTSNs(queue),
              testing::ElementsAre(TSN(10), TSN(11), TSN(12)));

  queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(11), State::kAcked),  //
                          Pair(TSN(12), State::kInFlight)));
}

TEST_F(RetransmissionQueueTest, AckWithGapBlocksFromRFC4960Section334) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillOnce(CreateChunk(OutgoingMessageId(3)))
      .WillOnce(CreateChunk(OutgoingMessageId(4)))
      .WillOnce(CreateChunk(OutgoingMessageId(5)))
      .WillOnce(CreateChunk(OutgoingMessageId(6)))
      .WillOnce(CreateChunk(OutgoingMessageId(7)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_THAT(GetSentPacketTSNs(queue),
              testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
                                   TSN(15), TSN(16), TSN(17)));

  queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
                                   {SackChunk::GapAckBlock(2, 3),
                                    SackChunk::GapAckBlock(5, 5)},
                                   {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(12), State::kAcked),   //
                          Pair(TSN(13), State::kNacked),  //
                          Pair(TSN(14), State::kAcked),   //
                          Pair(TSN(15), State::kAcked),   //
                          Pair(TSN(16), State::kNacked),  //
                          Pair(TSN(17), State::kAcked)));
}

TEST_F(RetransmissionQueueTest, ResendPacketsWhenNackedThreeTimes) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillOnce(CreateChunk(OutgoingMessageId(3)))
      .WillOnce(CreateChunk(OutgoingMessageId(4)))
      .WillOnce(CreateChunk(OutgoingMessageId(5)))
      .WillOnce(CreateChunk(OutgoingMessageId(6)))
      .WillOnce(CreateChunk(OutgoingMessageId(7)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_THAT(GetSentPacketTSNs(queue),
              testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
                                   TSN(15), TSN(16), TSN(17)));

  // Send more chunks, but leave some as gaps to force retransmission after
  // three NACKs.

  // Send 18
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(8)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(18)));

  // Ack 12, 14-15, 17-18
  queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
                                   {SackChunk::GapAckBlock(2, 3),
                                    SackChunk::GapAckBlock(5, 6)},
                                   {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(12), State::kAcked),   //
                          Pair(TSN(13), State::kNacked),  //
                          Pair(TSN(14), State::kAcked),   //
                          Pair(TSN(15), State::kAcked),   //
                          Pair(TSN(16), State::kNacked),  //
                          Pair(TSN(17), State::kAcked),   //
                          Pair(TSN(18), State::kAcked)));

  // Send 19
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(9)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(19)));

  // Ack 12, 14-15, 17-19
  queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
                                   {SackChunk::GapAckBlock(2, 3),
                                    SackChunk::GapAckBlock(5, 7)},
                                   {}));

  // Send 20
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(10)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(20)));

  // Ack 12, 14-15, 17-20
  queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
                                   {SackChunk::GapAckBlock(2, 3),
                                    SackChunk::GapAckBlock(5, 8)},
                                   {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(12), State::kAcked),              //
                          Pair(TSN(13), State::kToBeRetransmitted),  //
                          Pair(TSN(14), State::kAcked),              //
                          Pair(TSN(15), State::kAcked),              //
                          Pair(TSN(16), State::kToBeRetransmitted),  //
                          Pair(TSN(17), State::kAcked),              //
                          Pair(TSN(18), State::kAcked),              //
                          Pair(TSN(19), State::kAcked),              //
                          Pair(TSN(20), State::kAcked)));

  // This will trigger "fast retransmit" mode and only chunks 13 and 16 will be
  // resent right now. The send queue will not even be queried.
  EXPECT_CALL(producer_, Produce).Times(0);

  EXPECT_THAT(GetTSNsForFastRetransmit(queue),
              testing::ElementsAre(TSN(13), TSN(16)));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(12), State::kAcked),     //
                          Pair(TSN(13), State::kInFlight),  //
                          Pair(TSN(14), State::kAcked),     //
                          Pair(TSN(15), State::kAcked),     //
                          Pair(TSN(16), State::kInFlight),  //
                          Pair(TSN(17), State::kAcked),     //
                          Pair(TSN(18), State::kAcked),     //
                          Pair(TSN(19), State::kAcked),     //
                          Pair(TSN(20), State::kAcked)));
}

TEST_F(RetransmissionQueueTest, RestartsT3RtxOnRetransmitFirstOutstandingTSN) {
  // Verifies that if fast retransmit is retransmitting the first outstanding
  // TSN, it will also restart T3-RTX.
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  static constexpr Timestamp kStartTime = Timestamp::Seconds(100);
  now_ = kStartTime;

  EXPECT_THAT(GetSentPacketTSNs(queue),
              testing::ElementsAre(TSN(10), TSN(11), TSN(12)));

  // Ack 10, 12, after 100ms.
  now_ += TimeDelta::Millis(100);
  queue.HandleSack(
      now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(10), State::kAcked),   //
                          Pair(TSN(11), State::kNacked),  //
                          Pair(TSN(12), State::kAcked)));

  // Send 13
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(3)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(13)));

  // Ack 10, 12-13, after 100ms.
  now_ += TimeDelta::Millis(100);
  queue.HandleSack(
      now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {}));

  // Send 14
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(4)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(queue), testing::ElementsAre(TSN(14)));

  // Ack 10, 12-14, after 100 ms.
  now_ += TimeDelta::Millis(100);
  queue.HandleSack(
      now_, SackChunk(TSN(10), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(10), State::kAcked),              //
                          Pair(TSN(11), State::kToBeRetransmitted),  //
                          Pair(TSN(12), State::kAcked),              //
                          Pair(TSN(13), State::kAcked),              //
                          Pair(TSN(14), State::kAcked)));

  // This will trigger "fast retransmit" mode and only chunks 13 and 16 will be
  // resent right now. The send queue will not even be queried.
  EXPECT_CALL(producer_, Produce).Times(0);

  EXPECT_THAT(GetTSNsForFastRetransmit(queue), testing::ElementsAre(TSN(11)));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(10), State::kAcked),     //
                          Pair(TSN(11), State::kInFlight),  //
                          Pair(TSN(12), State::kAcked),     //
                          Pair(TSN(13), State::kAcked),     //
                          Pair(TSN(14), State::kAcked)));

  // Verify that the timer was really restarted when fast-retransmitting. The
  // timeout is `options_.rto_initial`, so advance the time just before that.
  now_ += options_.rto_initial.ToTimeDelta() - TimeDelta::Millis(1);
  EXPECT_FALSE(timeout_manager_.GetNextExpiredTimeout().has_value());

  // And ensure it really is running.
  now_ += TimeDelta::Millis(1);
  ASSERT_HAS_VALUE_AND_ASSIGN(TimeoutID timeout,
                              timeout_manager_.GetNextExpiredTimeout());
  // An expired timeout has to be handled (asserts validate this).
  timer_manager_.HandleTimeout(timeout);
}

TEST_F(RetransmissionQueueTest, CanOnlyProduceTwoPacketsButWantsToSendThree) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        return SendQueue::DataToSend(OutgoingMessageId(0),
                                     gen_.Ordered({1, 2, 3, 4}, "BE"));
      })
      .WillOnce([this](Timestamp, size_t) {
        return SendQueue::DataToSend(OutgoingMessageId(1),
                                     gen_.Ordered({1, 2, 3, 4}, "BE"));
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kInFlight),  //
                          Pair(TSN(11), State::kInFlight)));
}

TEST_F(RetransmissionQueueTest, RetransmitsOnT3Expiry) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        return SendQueue::DataToSend(OutgoingMessageId(0),
                                     gen_.Ordered({1, 2, 3, 4}, "BE"));
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kInFlight)));

  // Will force chunks to be retransmitted
  queue.HandleT3RtxTimerExpiry();

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kToBeRetransmitted)));

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kToBeRetransmitted)));

  std::vector<std::pair<TSN, Data>> chunks_to_rtx =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kInFlight)));
}

TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) {
  RetransmissionQueue queue =
      CreateQueue(/*supports_partial_reliability=*/false);
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kInFlight)));

  // Will force chunks to be retransmitted
  queue.HandleT3RtxTimerExpiry();

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kToBeRetransmitted)));

  EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0);
  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
}  // namespace dcsctp

TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kInFlight)));

  // Will force chunks to be retransmitted
  EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1);

  queue.HandleT3RtxTimerExpiry();

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kAbandoned)));

  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kAbandoned)));

  std::vector<std::pair<TSN, Data>> chunks_to_rtx =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_rtx, testing::IsEmpty());
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kAbandoned)));
}

TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
        dts.max_retransmissions = MaxRetransmits(3);
        return dts;
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kInFlight)));

  EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0);

  // Retransmission 1
  queue.HandleT3RtxTimerExpiry();
  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
  EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));

  // Retransmission 2
  queue.HandleT3RtxTimerExpiry();
  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
  EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));

  // Retransmission 3
  queue.HandleT3RtxTimerExpiry();
  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
  EXPECT_THAT(queue.GetChunksToSend(now_, 1000), SizeIs(1));

  // Retransmission 4 - not allowed.
  EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1);
  queue.HandleT3RtxTimerExpiry();
  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
  EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kAbandoned)));
}

TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
  RetransmissionQueue queue = CreateQueue();
  static constexpr size_t kCwnd = 1200;
  queue.set_cwnd(kCwnd);
  EXPECT_EQ(queue.cwnd(), kCwnd);
  EXPECT_EQ(queue.outstanding_bytes(), 0u);
  EXPECT_EQ(queue.outstanding_items(), 0u);

  std::vector<uint8_t> payload(1000);
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this, payload](Timestamp, size_t) {
        return SendQueue::DataToSend(OutgoingMessageId(0),
                                     gen_.Ordered(payload, "BE"));
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1500);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kInFlight)));
  EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
  EXPECT_EQ(queue.outstanding_items(), 1u);

  // Will force chunks to be retransmitted
  queue.HandleT3RtxTimerExpiry();

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kToBeRetransmitted)));
  EXPECT_EQ(queue.outstanding_bytes(), 0u);
  EXPECT_EQ(queue.outstanding_items(), 0u);

  std::vector<std::pair<TSN, Data>> chunks_to_rtx =
      queue.GetChunksToSend(now_, 1500);
  EXPECT_THAT(chunks_to_rtx, ElementsAre(Pair(TSN(10), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),  //
                          Pair(TSN(10), State::kInFlight)));
  EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
  EXPECT_EQ(queue.outstanding_items(), 1u);
}

TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B"));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, ""));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId,
                                  gen_.Ordered({9, 10, 11, 12}, ""));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  // Send and ack first chunk (TSN 10)
  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
                                          Pair(TSN(12), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kInFlight),  //
                          Pair(TSN(11), State::kInFlight),  //
                          Pair(TSN(12), State::kInFlight)));

  // Chunk 10 is acked, but the remaining are lost
  queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));

  EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId))
      .WillOnce(Return(true));

  queue.HandleT3RtxTimerExpiry();

  // NOTE: The TSN=13 represents the end fragment.
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(10), State::kAcked),      //
                          Pair(TSN(11), State::kAbandoned),  //
                          Pair(TSN(12), State::kAbandoned),  //
                          Pair(TSN(13), State::kAbandoned)));

  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));

  ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
  EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13));
  EXPECT_THAT(forward_tsn.skipped_streams(),
              UnorderedElementsAre(
                  ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42))));
}

TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B"));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, ""));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId,
                                  gen_.Ordered({9, 10, 11, 12}, "E"));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  // Send and ack first chunk (TSN 10)
  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
                                          Pair(TSN(12), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kInFlight),  //
                          Pair(TSN(11), State::kInFlight),  //
                          Pair(TSN(12), State::kInFlight)));

  // Chunk 10 is acked, but the remaining are lost
  queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));

  EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId))
      .WillOnce(Return(false));

  queue.HandleT3RtxTimerExpiry();

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(10), State::kAcked),      //
                          Pair(TSN(11), State::kAbandoned),  //
                          Pair(TSN(12), State::kAbandoned)));

  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));

  ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
  EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12));
  EXPECT_THAT(forward_tsn.skipped_streams(),
              UnorderedElementsAre(
                  ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42))));
}

TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
  RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        DataGeneratorOptions opts;
        opts.stream_id = StreamID(1);
        SendQueue::DataToSend dts(OutgoingMessageId(42),
                                  gen_.Ordered({1, 2, 3, 4}, "B", opts));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillOnce([this](Timestamp, size_t) {
        DataGeneratorOptions opts;
        opts.stream_id = StreamID(2);
        SendQueue::DataToSend dts(OutgoingMessageId(43),
                                  gen_.Unordered({1, 2, 3, 4}, "B", opts));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillOnce([this](Timestamp, size_t) {
        DataGeneratorOptions opts;
        opts.stream_id = StreamID(3);
        SendQueue::DataToSend dts(OutgoingMessageId(44),
                                  gen_.Ordered({9, 10, 11, 12}, "B", opts));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillOnce([this](Timestamp, size_t) {
        DataGeneratorOptions opts;
        opts.stream_id = StreamID(4);
        SendQueue::DataToSend dts(OutgoingMessageId(45),
                                  gen_.Ordered({13, 14, 15, 16}, "B", opts));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
                                          Pair(TSN(12), _), Pair(TSN(13), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kInFlight),  //
                          Pair(TSN(11), State::kInFlight),  //
                          Pair(TSN(12), State::kInFlight),  //
                          Pair(TSN(13), State::kInFlight)));

  // Chunk 13 is acked, but the remaining are lost
  queue.HandleSack(
      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(4, 4)}, {}));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),    //
                          Pair(TSN(10), State::kNacked),  //
                          Pair(TSN(11), State::kNacked),  //
                          Pair(TSN(12), State::kNacked),  //
                          Pair(TSN(13), State::kAcked)));

  EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(42)))
      .WillOnce(Return(true));
  EXPECT_CALL(producer_, Discard(StreamID(2), OutgoingMessageId(43)))
      .WillOnce(Return(true));
  EXPECT_CALL(producer_, Discard(StreamID(3), OutgoingMessageId(44)))
      .WillOnce(Return(true));

  queue.HandleT3RtxTimerExpiry();

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),       //
                          Pair(TSN(10), State::kAbandoned),  //
                          Pair(TSN(11), State::kAbandoned),  //
                          Pair(TSN(12), State::kAbandoned),  //
                          Pair(TSN(13), State::kAcked),
                          // Representing end fragments of stream 1-3
                          Pair(TSN(14), State::kAbandoned),  //
                          Pair(TSN(15), State::kAbandoned),  //
                          Pair(TSN(16), State::kAbandoned)));

  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));

  IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn();
  EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12));
  EXPECT_THAT(
      forward_tsn1.skipped_streams(),
      UnorderedElementsAre(IForwardTsnChunk::SkippedStream(
                               IsUnordered(false), StreamID(1), MID(42)),
                           IForwardTsnChunk::SkippedStream(
                               IsUnordered(true), StreamID(2), MID(42)),
                           IForwardTsnChunk::SkippedStream(
                               IsUnordered(false), StreamID(3), MID(42))));

  // When TSN 13 is acked, the placeholder "end fragments" must be skipped as
  // well.

  // A receiver is more likely to ack TSN 13, but do it incrementally.
  queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));

  EXPECT_CALL(producer_, Discard).Times(0);
  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));

  queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {}));
  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(13), State::kAcked),      //
                          Pair(TSN(14), State::kAbandoned),  //
                          Pair(TSN(15), State::kAbandoned),  //
                          Pair(TSN(16), State::kAbandoned)));

  IForwardTsnChunk forward_tsn2 = queue.CreateIForwardTsn();
  EXPECT_EQ(forward_tsn2.new_cumulative_tsn(), TSN(16));
  EXPECT_THAT(
      forward_tsn2.skipped_streams(),
      UnorderedElementsAre(IForwardTsnChunk::SkippedStream(
                               IsUnordered(false), StreamID(1), MID(42)),
                           IForwardTsnChunk::SkippedStream(
                               IsUnordered(true), StreamID(2), MID(42)),
                           IForwardTsnChunk::SkippedStream(
                               IsUnordered(false), StreamID(3), MID(42))));
}

TEST_F(RetransmissionQueueTest, MeasureRTT) {
  RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(OutgoingMessageId(0),
                                  gen_.Ordered({1, 2, 3, 4}, "B"));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));

  now_ = now_ + TimeDelta::Millis(123);

  EXPECT_CALL(on_rtt_, Call(TimeDelta::Millis(123))).Times(1);
  queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
}

TEST_F(RetransmissionQueueTest, ValidateCumTsnAtRest) {
  RetransmissionQueue queue = CreateQueue(/*use_message_interleaving=*/true);

  EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {})));
  EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {})));
  EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})));
}

TEST_F(RetransmissionQueueTest, ValidateCumTsnAckOnInflightData) {
  RetransmissionQueue queue = CreateQueue();

  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillOnce(CreateChunk(OutgoingMessageId(3)))
      .WillOnce(CreateChunk(OutgoingMessageId(4)))
      .WillOnce(CreateChunk(OutgoingMessageId(5)))
      .WillOnce(CreateChunk(OutgoingMessageId(6)))
      .WillOnce(CreateChunk(OutgoingMessageId(7)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_THAT(GetSentPacketTSNs(queue),
              testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
                                   TSN(15), TSN(16), TSN(17)));

  EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(8), kArwnd, {}, {})));
  EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(9), kArwnd, {}, {})));
  EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})));
  EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})));
  EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})));
  EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {})));
  EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(14), kArwnd, {}, {})));
  EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(15), kArwnd, {}, {})));
  EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(16), kArwnd, {}, {})));
  EXPECT_TRUE(queue.HandleSack(now_, SackChunk(TSN(17), kArwnd, {}, {})));
  EXPECT_FALSE(queue.HandleSack(now_, SackChunk(TSN(18), kArwnd, {}, {})));
}

TEST_F(RetransmissionQueueTest, HandleGapAckBlocksMatchingNoInflightData) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillOnce(CreateChunk(OutgoingMessageId(3)))
      .WillOnce(CreateChunk(OutgoingMessageId(4)))
      .WillOnce(CreateChunk(OutgoingMessageId(5)))
      .WillOnce(CreateChunk(OutgoingMessageId(6)))
      .WillOnce(CreateChunk(OutgoingMessageId(7)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_THAT(GetSentPacketTSNs(queue),
              testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
                                   TSN(15), TSN(16), TSN(17)));

  // Ack 9, 20-25. This is an invalid SACK, but should still be handled.
  queue.HandleSack(
      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(11, 16)}, {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kInFlight),  //
                          Pair(TSN(11), State::kInFlight),  //
                          Pair(TSN(12), State::kInFlight),  //
                          Pair(TSN(13), State::kInFlight),  //
                          Pair(TSN(14), State::kInFlight),  //
                          Pair(TSN(15), State::kInFlight),  //
                          Pair(TSN(16), State::kInFlight),  //
                          Pair(TSN(17), State::kInFlight)));
}

TEST_F(RetransmissionQueueTest, HandleInvalidGapAckBlocks) {
  RetransmissionQueue queue = CreateQueue();

  // Nothing produced - nothing in retransmission queue

  // Ack 9, 12-13
  queue.HandleSack(
      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(3, 4)}, {}));

  // Gap ack blocks are just ignore.
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked)));
}

TEST_F(RetransmissionQueueTest, GapAckBlocksDoNotMoveCumTsnAck) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillOnce(CreateChunk(OutgoingMessageId(3)))
      .WillOnce(CreateChunk(OutgoingMessageId(4)))
      .WillOnce(CreateChunk(OutgoingMessageId(5)))
      .WillOnce(CreateChunk(OutgoingMessageId(6)))
      .WillOnce(CreateChunk(OutgoingMessageId(7)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_THAT(GetSentPacketTSNs(queue),
              testing::ElementsAre(TSN(10), TSN(11), TSN(12), TSN(13), TSN(14),
                                   TSN(15), TSN(16), TSN(17)));

  // Ack 9, 10-14. This is actually an invalid ACK as the first gap can't be
  // adjacent to the cum-tsn-ack, but it's not strictly forbidden. However, the
  // cum-tsn-ack should not move, as the gap-ack-blocks are just advisory.
  queue.HandleSack(
      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(1, 5)}, {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kAcked),     //
                          Pair(TSN(11), State::kAcked),     //
                          Pair(TSN(12), State::kAcked),     //
                          Pair(TSN(13), State::kAcked),     //
                          Pair(TSN(14), State::kAcked),     //
                          Pair(TSN(15), State::kInFlight),  //
                          Pair(TSN(16), State::kInFlight),  //
                          Pair(TSN(17), State::kInFlight)));
}

TEST_F(RetransmissionQueueTest, StaysWithinAvailableSize) {
  RetransmissionQueue queue = CreateQueue();

  // See SctpPacketTest::ReturnsCorrectSpaceAvailableToStayWithinMTU for the
  // magic numbers in this test.
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t size) {
        EXPECT_EQ(size, 1176 - DataChunk::kHeaderSize);

        std::vector<uint8_t> payload(183);
        return SendQueue::DataToSend(OutgoingMessageId(0),
                                     gen_.Ordered(payload, "BE"));
      })
      .WillOnce([this](Timestamp, size_t size) {
        EXPECT_EQ(size, 976 - DataChunk::kHeaderSize);

        std::vector<uint8_t> payload(957);
        return SendQueue::DataToSend(OutgoingMessageId(1),
                                     gen_.Ordered(payload, "BE"));
      });

  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1188 - 12);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _)));
}

TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "B"));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({5, 6, 7, 8}, ""));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId,
                                  gen_.Ordered({9, 10, 11, 12}, ""));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  // Send and ack first chunk (TSN 10)
  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
                                          Pair(TSN(12), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kInFlight),  //
                          Pair(TSN(11), State::kInFlight),  //
                          Pair(TSN(12), State::kInFlight)));
  EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
  EXPECT_EQ(queue.outstanding_items(), 3u);

  // Mark the message as lost.
  EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(1);
  queue.HandleT3RtxTimerExpiry();

  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),       //
                          Pair(TSN(10), State::kAbandoned),  //
                          Pair(TSN(11), State::kAbandoned),  //
                          Pair(TSN(12), State::kAbandoned)));
  EXPECT_EQ(queue.outstanding_bytes(), 0u);
  EXPECT_EQ(queue.outstanding_items(), 0u);

  // Now ACK those, one at a time.
  queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
  EXPECT_EQ(queue.outstanding_bytes(), 0u);
  EXPECT_EQ(queue.outstanding_items(), 0u);

  queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
  EXPECT_EQ(queue.outstanding_bytes(), 0u);
  EXPECT_EQ(queue.outstanding_items(), 0u);

  queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
  EXPECT_EQ(queue.outstanding_bytes(), 0u);
  EXPECT_EQ(queue.outstanding_items(), 0u);
}

TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {
  RetransmissionQueue queue = CreateQueue();
  DataGeneratorOptions options;
  options.stream_id = StreamID(17);
  options.mid = MID(42);
  Timestamp test_start = now_;
  EXPECT_CALL(producer_, Produce)
      .WillOnce([&](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId,
                                  gen_.Ordered({1, 2, 3, 4}, "B", options));
        dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
        return dts;
      })
      .WillOnce([&](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId,
                                  gen_.Ordered({5, 6, 7, 8}, "", options));
        dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
        return dts;
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 24);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));

  EXPECT_CALL(producer_, Discard(StreamID(17), kMessageId))
      .WillOnce(Return(true));
  now_ += TimeDelta::Millis(100);

  EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty());

  EXPECT_THAT(
      queue.GetChunkStatesForTesting(),
      ElementsAre(Pair(TSN(9), State::kAcked),         // Initial TSN
                  Pair(TSN(10), State::kAbandoned),    // Produced
                  Pair(TSN(11), State::kAbandoned),    // Produced and expired
                  Pair(TSN(12), State::kAbandoned)));  // Placeholder end
}

TEST_F(RetransmissionQueueTest, ExpireCorrectMessageFromSendQueue) {
  RetransmissionQueue queue = CreateQueue();
  Timestamp test_start = now_;
  EXPECT_CALL(producer_, Produce)
      .WillOnce([&](Timestamp, size_t) {
        SendQueue::DataToSend dts(
            OutgoingMessageId(42),
            gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)}));
        dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
        return dts;
      })
      .WillOnce([&](Timestamp, size_t) {
        SendQueue::DataToSend dts(
            OutgoingMessageId(43),
            gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)}));
        dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
        return dts;
      })
      // Stream reset - MID reset to zero again.
      .WillOnce([&](Timestamp, size_t) {
        SendQueue::DataToSend dts(
            OutgoingMessageId(44),
            gen_.Ordered({1, 2, 3, 4}, "B", {.mid = MID(0)}));
        dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
        return dts;
      })
      .WillOnce([&](Timestamp, size_t) {
        SendQueue::DataToSend dts(
            OutgoingMessageId(44),
            gen_.Ordered({5, 6, 7, 8}, "", {.mid = MID(0)}));
        dts.expires_at = Timestamp(test_start + TimeDelta::Millis(10));
        return dts;
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(44)))
      .WillOnce(Return(true));

  EXPECT_THAT(queue.GetChunksToSend(now_, 24),
              ElementsAre(Pair(TSN(10), Field(&Data::mid, MID(0)))));
  EXPECT_THAT(queue.GetChunksToSend(now_, 24),
              ElementsAre(Pair(TSN(11), Field(&Data::mid, MID(1)))));
  EXPECT_THAT(queue.GetChunksToSend(now_, 24),
              ElementsAre(Pair(TSN(12), Field(&Data::mid, MID(0)))));

  now_ += TimeDelta::Millis(100);
  EXPECT_THAT(queue.GetChunksToSend(now_, 24), IsEmpty());

  EXPECT_THAT(
      queue.GetChunkStatesForTesting(),
      ElementsAre(Pair(TSN(9), State::kAcked),       // Initial TSN
                  Pair(TSN(10), State::kInFlight),   // OutgoingMessageId=42, BE
                  Pair(TSN(11), State::kInFlight),   // OutgoingMessageId=43, BE
                  Pair(TSN(12), State::kAbandoned),  // OutgoingMessageId=44, B
                  Pair(TSN(13), State::kAbandoned),  // Produced and expired
                  Pair(TSN(14), State::kAbandoned)));  // Placeholder end
}

TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
        dts.max_retransmissions = MaxRetransmits(0);
        return dts;
      })
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));

  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
                                          Pair(TSN(12), _), Pair(TSN(13), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kInFlight),  //
                          Pair(TSN(11), State::kInFlight),  //
                          Pair(TSN(12), State::kInFlight),  //
                          Pair(TSN(13), State::kInFlight)));

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));

  EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId)).Times(0);

  queue.HandleSack(
      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 2)}, {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kNacked),    //
                          Pair(TSN(11), State::kAcked),     //
                          Pair(TSN(12), State::kInFlight),  //
                          Pair(TSN(13), State::kInFlight)));

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));

  queue.HandleSack(
      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 3)}, {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),    //
                          Pair(TSN(10), State::kNacked),  //
                          Pair(TSN(11), State::kAcked),   //
                          Pair(TSN(12), State::kAcked),   //
                          Pair(TSN(13), State::kInFlight)));

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));

  EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId))
      .WillOnce(Return(false));
  queue.HandleSack(
      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 4)}, {}));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),       //
                          Pair(TSN(10), State::kAbandoned),  //
                          Pair(TSN(11), State::kAcked),      //
                          Pair(TSN(12), State::kAcked),      //
                          Pair(TSN(13), State::kAcked)));

  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
}

TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) {
  // This is a fairly long test.
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this](Timestamp, size_t) {
        SendQueue::DataToSend dts(kMessageId, gen_.Ordered({1, 2, 3, 4}, "BE"));
        dts.max_retransmissions = MaxRetransmits(2);
        return dts;
      })
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillOnce(CreateChunk(OutgoingMessageId(3)))
      .WillOnce(CreateChunk(OutgoingMessageId(4)))
      .WillOnce(CreateChunk(OutgoingMessageId(5)))
      .WillOnce(CreateChunk(OutgoingMessageId(6)))
      .WillOnce(CreateChunk(OutgoingMessageId(7)))
      .WillOnce(CreateChunk(OutgoingMessageId(8)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));

  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1000);
  EXPECT_THAT(chunks_to_send,
              ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _),
                          Pair(TSN(13), _), Pair(TSN(14), _), Pair(TSN(15), _),
                          Pair(TSN(16), _), Pair(TSN(17), _), Pair(TSN(18), _),
                          Pair(TSN(19), _)));

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kInFlight),  //
                          Pair(TSN(11), State::kInFlight),  //
                          Pair(TSN(12), State::kInFlight),  //
                          Pair(TSN(13), State::kInFlight),  //
                          Pair(TSN(14), State::kInFlight),  //
                          Pair(TSN(15), State::kInFlight),  //
                          Pair(TSN(16), State::kInFlight),  //
                          Pair(TSN(17), State::kInFlight),  //
                          Pair(TSN(18), State::kInFlight),  //
                          Pair(TSN(19), State::kInFlight)));

  EXPECT_CALL(producer_, Discard(StreamID(1), OutgoingMessageId(8))).Times(0);

  // Ack TSN [11 to 13] - three nacks for TSN(10), which will retransmit it.
  for (int tsn = 11; tsn <= 13; ++tsn) {
    queue.HandleSack(
        now_,
        SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
  }

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),               //
                          Pair(TSN(10), State::kToBeRetransmitted),  //
                          Pair(TSN(11), State::kAcked),              //
                          Pair(TSN(12), State::kAcked),              //
                          Pair(TSN(13), State::kAcked),              //
                          Pair(TSN(14), State::kInFlight),           //
                          Pair(TSN(15), State::kInFlight),           //
                          Pair(TSN(16), State::kInFlight),           //
                          Pair(TSN(17), State::kInFlight),           //
                          Pair(TSN(18), State::kInFlight),           //
                          Pair(TSN(19), State::kInFlight)));

  EXPECT_THAT(queue.GetChunksForFastRetransmit(1000),
              ElementsAre(Pair(TSN(10), _)));

  // Ack TSN [14 to 16] - three more nacks - second and last retransmission.
  for (int tsn = 14; tsn <= 16; ++tsn) {
    queue.HandleSack(
        now_,
        SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
  }

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),               //
                          Pair(TSN(10), State::kToBeRetransmitted),  //
                          Pair(TSN(11), State::kAcked),              //
                          Pair(TSN(12), State::kAcked),              //
                          Pair(TSN(13), State::kAcked),              //
                          Pair(TSN(14), State::kAcked),              //
                          Pair(TSN(15), State::kAcked),              //
                          Pair(TSN(16), State::kAcked),              //
                          Pair(TSN(17), State::kInFlight),           //
                          Pair(TSN(18), State::kInFlight),           //
                          Pair(TSN(19), State::kInFlight)));

  EXPECT_THAT(queue.GetChunksToSend(now_, 1000), ElementsAre(Pair(TSN(10), _)));

  // Ack TSN [17 to 18]
  for (int tsn = 17; tsn <= 18; ++tsn) {
    queue.HandleSack(
        now_,
        SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, (tsn - 9))}, {}));
  }

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),    //
                          Pair(TSN(10), State::kNacked),  //
                          Pair(TSN(11), State::kAcked),   //
                          Pair(TSN(12), State::kAcked),   //
                          Pair(TSN(13), State::kAcked),   //
                          Pair(TSN(14), State::kAcked),   //
                          Pair(TSN(15), State::kAcked),   //
                          Pair(TSN(16), State::kAcked),   //
                          Pair(TSN(17), State::kAcked),   //
                          Pair(TSN(18), State::kAcked),   //
                          Pair(TSN(19), State::kInFlight)));

  EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));

  // Ack TSN 19 - three more nacks for TSN 10, no more retransmissions.
  EXPECT_CALL(producer_, Discard(StreamID(1), kMessageId))
      .WillOnce(Return(false));
  queue.HandleSack(
      now_, SackChunk(TSN(9), kArwnd, {SackChunk::GapAckBlock(2, 10)}, {}));

  EXPECT_THAT(queue.GetChunksToSend(now_, 1000), IsEmpty());

  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),       //
                          Pair(TSN(10), State::kAbandoned),  //
                          Pair(TSN(11), State::kAcked),      //
                          Pair(TSN(12), State::kAcked),      //
                          Pair(TSN(13), State::kAcked),      //
                          Pair(TSN(14), State::kAcked),      //
                          Pair(TSN(15), State::kAcked),      //
                          Pair(TSN(16), State::kAcked),      //
                          Pair(TSN(17), State::kAcked),      //
                          Pair(TSN(18), State::kAcked),      //
                          Pair(TSN(19), State::kAcked)));

  EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
}

TEST_F(RetransmissionQueueTest, CwndRecoversWhenAcking) {
  RetransmissionQueue queue = CreateQueue();
  static constexpr size_t kCwnd = 1200;
  queue.set_cwnd(kCwnd);
  EXPECT_EQ(queue.cwnd(), kCwnd);

  std::vector<uint8_t> payload(1000);
  EXPECT_CALL(producer_, Produce)
      .WillOnce([this, payload](Timestamp, size_t) {
        return SendQueue::DataToSend(OutgoingMessageId(0),
                                     gen_.Ordered(payload, "BE"));
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 1500);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));
  size_t serialized_size = payload.size() + DataChunk::kHeaderSize;
  EXPECT_EQ(queue.outstanding_bytes(), serialized_size);

  queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));

  EXPECT_EQ(queue.cwnd(), kCwnd + serialized_size);
}

// Verifies that it doesn't produce tiny packets, when getting close to
// the full congestion window.
TEST_F(RetransmissionQueueTest, OnlySendsLargePacketsOnLargeCongestionWindow) {
  RetransmissionQueue queue = CreateQueue();
  size_t intial_cwnd = options_.avoid_fragmentation_cwnd_mtus * options_.mtu;
  queue.set_cwnd(intial_cwnd);
  EXPECT_EQ(queue.cwnd(), intial_cwnd);

  // Fill the congestion window almost - leaving 500 bytes.
  size_t chunk_size = intial_cwnd - 500;
  EXPECT_CALL(producer_, Produce)
      .WillOnce([chunk_size, this](Timestamp, size_t) {
        return SendQueue::DataToSend(
            OutgoingMessageId(0),
            gen_.Ordered(std::vector<uint8_t>(chunk_size), "BE"));
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_TRUE(queue.can_send_data());
  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 10000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));

  // To little space left - will not send more.
  EXPECT_FALSE(queue.can_send_data());

  // But when the first chunk is acked, it will continue.
  queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));

  EXPECT_TRUE(queue.can_send_data());
  EXPECT_EQ(queue.outstanding_bytes(), 0u);
  EXPECT_EQ(queue.cwnd(), intial_cwnd + kMaxMtu);
}

TEST_F(RetransmissionQueueTest, AllowsSmallFragmentsOnSmallCongestionWindow) {
  RetransmissionQueue queue = CreateQueue();
  size_t intial_cwnd =
      options_.avoid_fragmentation_cwnd_mtus * options_.mtu - 1;
  queue.set_cwnd(intial_cwnd);
  EXPECT_EQ(queue.cwnd(), intial_cwnd);

  // Fill the congestion window almost - leaving 500 bytes.
  size_t chunk_size = intial_cwnd - 500;
  EXPECT_CALL(producer_, Produce)
      .WillOnce([chunk_size, this](Timestamp, size_t) {
        return SendQueue::DataToSend(
            OutgoingMessageId(0),
            gen_.Ordered(std::vector<uint8_t>(chunk_size), "BE"));
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_TRUE(queue.can_send_data());
  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 10000);
  EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _)));

  // With congestion window under limit, allow small packets to be created.
  EXPECT_TRUE(queue.can_send_data());
}

TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenHasNoOutstandingData) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));
  EXPECT_EQ(
      queue.GetHandoverReadiness(),
      HandoverReadinessStatus(
          HandoverUnreadinessReason::kRetransmissionQueueOutstandingData));

  queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
  EXPECT_EQ(queue.GetHandoverReadiness(), HandoverReadinessStatus());
}

TEST_F(RetransmissionQueueTest, ReadyForHandoverWhenNothingToRetransmit) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillOnce(CreateChunk(OutgoingMessageId(3)))
      .WillOnce(CreateChunk(OutgoingMessageId(4)))
      .WillOnce(CreateChunk(OutgoingMessageId(5)))
      .WillOnce(CreateChunk(OutgoingMessageId(6)))
      .WillOnce(CreateChunk(OutgoingMessageId(7)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(8));
  EXPECT_EQ(
      queue.GetHandoverReadiness(),
      HandoverReadinessStatus(
          HandoverUnreadinessReason::kRetransmissionQueueOutstandingData));

  // Send more chunks, but leave some chunks unacked to force retransmission
  // after three NACKs.

  // Send 18
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(8)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));

  // Ack 12, 14-15, 17-18
  queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
                                   {SackChunk::GapAckBlock(2, 3),
                                    SackChunk::GapAckBlock(5, 6)},
                                   {}));

  // Send 19
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(9)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));

  // Ack 12, 14-15, 17-19
  queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
                                   {SackChunk::GapAckBlock(2, 3),
                                    SackChunk::GapAckBlock(5, 7)},
                                   {}));

  // Send 20
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(10)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(1));

  // Ack 12, 14-15, 17-20
  // This will trigger "fast retransmit" mode and only chunks 13 and 16 will be
  // resent right now. The send queue will not even be queried.
  queue.HandleSack(now_, SackChunk(TSN(12), kArwnd,
                                   {SackChunk::GapAckBlock(2, 3),
                                    SackChunk::GapAckBlock(5, 8)},
                                   {}));
  EXPECT_EQ(
      queue.GetHandoverReadiness(),
      HandoverReadinessStatus()
          .Add(HandoverUnreadinessReason::kRetransmissionQueueOutstandingData)
          .Add(HandoverUnreadinessReason::kRetransmissionQueueFastRecovery)
          .Add(HandoverUnreadinessReason::kRetransmissionQueueNotEmpty));

  // Send "fast retransmit" mode chunks
  EXPECT_CALL(producer_, Produce).Times(0);
  EXPECT_THAT(GetTSNsForFastRetransmit(queue), SizeIs(2));
  EXPECT_EQ(
      queue.GetHandoverReadiness(),
      HandoverReadinessStatus()
          .Add(HandoverUnreadinessReason::kRetransmissionQueueOutstandingData)
          .Add(HandoverUnreadinessReason::kRetransmissionQueueFastRecovery));

  // Ack 20 to confirm the retransmission
  queue.HandleSack(now_, SackChunk(TSN(20), kArwnd, {}, {}));
  EXPECT_EQ(queue.GetHandoverReadiness(), HandoverReadinessStatus());
}

TEST_F(RetransmissionQueueTest, HandoverTest) {
  RetransmissionQueue queue = CreateQueue();
  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(0)))
      .WillOnce(CreateChunk(OutgoingMessageId(1)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(queue), SizeIs(2));
  queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));

  std::unique_ptr<RetransmissionQueue> handedover_queue =
      CreateQueueByHandover(queue);

  EXPECT_CALL(producer_, Produce)
      .WillOnce(CreateChunk(OutgoingMessageId(2)))
      .WillOnce(CreateChunk(OutgoingMessageId(3)))
      .WillOnce(CreateChunk(OutgoingMessageId(4)))
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });
  EXPECT_THAT(GetSentPacketTSNs(*handedover_queue),
              testing::ElementsAre(TSN(12), TSN(13), TSN(14)));

  handedover_queue->HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {}));
  EXPECT_THAT(handedover_queue->GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(13), State::kAcked),  //
                          Pair(TSN(14), State::kInFlight)));
}

TEST_F(RetransmissionQueueTest, CanAlwaysSendOnePacket) {
  RetransmissionQueue queue = CreateQueue();

  // A large payload - enough to not fit two DATA in same packet.
  size_t mtu = RoundDownTo4(options_.mtu);
  std::vector<uint8_t> payload(mtu - 100);

  EXPECT_CALL(producer_, Produce)
      .WillOnce([this, payload](Timestamp, size_t) {
        return SendQueue::DataToSend(OutgoingMessageId(0),
                                     gen_.Ordered(payload, "B"));
      })
      .WillOnce([this, payload](Timestamp, size_t) {
        return SendQueue::DataToSend(OutgoingMessageId(0),
                                     gen_.Ordered(payload, ""));
      })
      .WillOnce([this, payload](Timestamp, size_t) {
        return SendQueue::DataToSend(OutgoingMessageId(0),
                                     gen_.Ordered(payload, ""));
      })
      .WillOnce([this, payload](Timestamp, size_t) {
        return SendQueue::DataToSend(OutgoingMessageId(0),
                                     gen_.Ordered(payload, ""));
      })
      .WillOnce([this, payload](Timestamp, size_t) {
        return SendQueue::DataToSend(OutgoingMessageId(0),
                                     gen_.Ordered(payload, "E"));
      })
      .WillRepeatedly([](Timestamp, size_t) { return absl::nullopt; });

  // Produce all chunks and put them in the retransmission queue.
  std::vector<std::pair<TSN, Data>> chunks_to_send =
      queue.GetChunksToSend(now_, 5 * mtu);
  EXPECT_THAT(chunks_to_send,
              ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), Pair(TSN(12), _),
                          Pair(TSN(13), _), Pair(TSN(14), _)));
  EXPECT_THAT(queue.GetChunkStatesForTesting(),
              ElementsAre(Pair(TSN(9), State::kAcked),      //
                          Pair(TSN(10), State::kInFlight),  //
                          Pair(TSN(11), State::kInFlight),  //
                          Pair(TSN(12), State::kInFlight),
                          Pair(TSN(13), State::kInFlight),
                          Pair(TSN(14), State::kInFlight)));

  // Ack 12, and report an empty receiver window (the peer obviously has a
  // tiny receive window).
  queue.HandleSack(
      now_, SackChunk(TSN(9), /*rwnd=*/0, {SackChunk::GapAckBlock(3, 3)}, {}));

  // Force TSN 10 to be retransmitted.
  queue.HandleT3RtxTimerExpiry();

  // Even if the receiver window is empty, it will allow TSN 10 to be sent.
  EXPECT_THAT(queue.GetChunksToSend(now_, mtu), ElementsAre(Pair(TSN(10), _)));

  // But not more than that, as there now is outstanding data.
  EXPECT_THAT(queue.GetChunksToSend(now_, mtu), IsEmpty());

  // Don't ack any new data, and still have receiver window zero.
  queue.HandleSack(
      now_, SackChunk(TSN(9), /*rwnd=*/0, {SackChunk::GapAckBlock(3, 3)}, {}));

  // There is in-flight data, so new data should not be allowed to be send since
  // the receiver window is full.
  EXPECT_THAT(queue.GetChunksToSend(now_, mtu), IsEmpty());

  // Ack that packet (no more in-flight data), but still report an empty
  // receiver window.
  queue.HandleSack(
      now_, SackChunk(TSN(10), /*rwnd=*/0, {SackChunk::GapAckBlock(2, 2)}, {}));

  // Then TSN 11 can be sent, as there is no in-flight data.
  EXPECT_THAT(queue.GetChunksToSend(now_, mtu), ElementsAre(Pair(TSN(11), _)));
  EXPECT_THAT(queue.GetChunksToSend(now_, mtu), IsEmpty());

  // Ack and recover the receiver window
  queue.HandleSack(now_, SackChunk(TSN(12), /*rwnd=*/5 * mtu, {}, {}));

  // That will unblock sending remaining chunks.
  EXPECT_THAT(queue.GetChunksToSend(now_, mtu), ElementsAre(Pair(TSN(13), _)));
  EXPECT_THAT(queue.GetChunksToSend(now_, mtu), ElementsAre(Pair(TSN(14), _)));
  EXPECT_THAT(queue.GetChunksToSend(now_, mtu), IsEmpty());
}

}  // namespace
}  // namespace dcsctp
