dcsctp: Add priority support to send queue

This mainly modifies the stream scheduler to add a weighted fair queuing
algorithm in addition to its round robin algorithm. The WFQ algorithm is
selected whenever interleaving is enabled, to ensure that the socket
stays backwards compatible in the normal (non-interleaved) scenario.

Adaptation to send queue and socket comes in a follow-up CL.

Bug: webrtc:5696
Change-Id: I8f0dbfa8c2f40f2e84cee536ea821e7ef4af6310
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/261947
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37330}
diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn
index ae81db8..e691b76 100644
--- a/net/dcsctp/tx/BUILD.gn
+++ b/net/dcsctp/tx/BUILD.gn
@@ -55,7 +55,9 @@
     "../../../rtc_base:strong_alias",
     "../../../rtc_base/containers:flat_set",
     "../common:str_join",
+    "../packet:chunk",
     "../packet:data",
+    "../packet:sctp_packet",
     "../public:socket",
     "../public:types",
   ]
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index ee8bf82..bec6f08 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -39,6 +39,8 @@
     : log_prefix_(std::string(log_prefix) + "fcfs: "),
       buffer_size_(buffer_size),
       default_priority_(default_priority),
+      // TODO(webrtc:5696): Provide correct MTU.
+      scheduler_(DcSctpOptions::kMaxSafeMTUSize),
       on_buffered_amount_low_(std::move(on_buffered_amount_low)),
       total_buffered_amount_(std::move(on_total_buffered_amount_low)) {
   total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
@@ -482,7 +484,7 @@
                                     StreamPriority priority) {
   OutgoingStream& stream = GetOrCreateStreamInfo(stream_id);
 
-  stream.set_priority(priority);
+  stream.SetPriority(priority);
   RTC_DCHECK(IsConsistent());
 }
 
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index eea814c..c2f1ee8 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -171,8 +171,8 @@
     bool has_partially_sent_message() const;
 
     StreamPriority priority() const { return scheduler_stream_->priority(); }
-    void set_priority(StreamPriority priority) {
-      scheduler_stream_->set_priority(priority);
+    void SetPriority(StreamPriority priority) {
+      scheduler_stream_->SetPriority(priority);
     }
 
     void AddHandoverState(
diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc
index 1d15aec..d1560a7 100644
--- a/net/dcsctp/tx/stream_scheduler.cc
+++ b/net/dcsctp/tx/stream_scheduler.cc
@@ -9,6 +9,8 @@
  */
 #include "net/dcsctp/tx/stream_scheduler.h"
 
+#include <algorithm>
+
 #include "absl/algorithm/container.h"
 #include "absl/types/optional.h"
 #include "api/array_view.h"
@@ -23,14 +25,19 @@
 
 namespace dcsctp {
 
-void StreamScheduler::Stream::set_priority(StreamPriority priority) {
+void StreamScheduler::Stream::SetPriority(StreamPriority priority) {
   priority_ = priority;
+  inverse_weight_ = InverseWeight(priority);
 }
 
 absl::optional<SendQueue::DataToSend> StreamScheduler::Produce(
     TimeMs now,
     size_t max_size) {
-  bool rescheduling = !currently_sending_a_message_;
+  // For non-interleaved streams, avoid rescheduling while still sending a
+  // message as it needs to be sent in full. For interleaved messaging,
+  // reschedule for every I-DATA chunk sent.
+  bool rescheduling =
+      enable_message_interleaving_ || !currently_sending_a_message_;
 
   RTC_LOG(LS_VERBOSE) << "Producing data, rescheduling=" << rescheduling
                       << ", active="
@@ -92,7 +99,7 @@
   // in `active_streams`.
   size_t bytes_to_send_next = current_stream_->bytes_to_send_in_next_message();
   if (rescheduling && bytes_to_send_next > 0) {
-    current_stream_->MakeActive();
+    current_stream_->MakeActive(bytes_to_send_next);
   } else if (!rescheduling && bytes_to_send_next == 0) {
     current_stream_->MakeInactive();
   }
@@ -101,13 +108,19 @@
   return data;
 }
 
-StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime()
-    const {
-  // Implement round-robin by letting the stream have its next virtual finish
-  // time in the future. It doesn't matter how far into the future, just any
-  // positive number so that any other stream that has the same virtual finish
-  // time as this stream gets to produce their data before revisiting this
-  // stream.
+StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime(
+    size_t bytes_to_send_next) const {
+  if (parent_.enable_message_interleaving_) {
+    // Perform weighted fair queuing scheduling.
+    return VirtualTime(*current_virtual_time_ +
+                       bytes_to_send_next * *inverse_weight_);
+  }
+
+  // Perform round-robin scheduling by letting the stream have its next virtual
+  // finish time in the future. It doesn't matter how far into the future, just
+  // any positive number so that any other stream that has the same virtual
+  // finish time as this stream gets to produce their data before revisiting
+  // this stream.
   return VirtualTime(*current_virtual_time_ + 1);
 }
 
@@ -117,7 +130,7 @@
   absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
 
   if (data.has_value()) {
-    VirtualTime new_current = GetNextFinishTime();
+    VirtualTime new_current = CalculateFinishTime(data->data.payload.size());
     RTC_DLOG(LS_VERBOSE) << "Virtual time changed: " << *current_virtual_time_
                          << " -> " << *new_current;
     current_virtual_time_ = new_current;
@@ -140,19 +153,22 @@
 void StreamScheduler::Stream::MaybeMakeActive() {
   RTC_DLOG(LS_VERBOSE) << "MaybeMakeActive(" << *stream_id() << ")";
   RTC_DCHECK(next_finish_time_ == VirtualTime::Zero());
-  if (bytes_to_send_in_next_message() == 0) {
+  size_t bytes_to_send_next = bytes_to_send_in_next_message();
+  if (bytes_to_send_next == 0) {
     return;
   }
 
-  MakeActive();
+  MakeActive(bytes_to_send_next);
 }
 
-void StreamScheduler::Stream::MakeActive() {
+void StreamScheduler::Stream::MakeActive(size_t bytes_to_send_next) {
   current_virtual_time_ = parent_.virtual_time_;
-  VirtualTime next_finish_time = GetNextFinishTime();
+  RTC_DCHECK_GT(bytes_to_send_next, 0);
+  VirtualTime next_finish_time = CalculateFinishTime(
+      std::min(bytes_to_send_next, parent_.max_payload_bytes_));
+  RTC_DCHECK_GT(*next_finish_time, 0);
   RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id()
                        << " active, expiring at " << *next_finish_time;
-  RTC_DCHECK_GT(*next_finish_time, 0);
   RTC_DCHECK(next_finish_time_ == VirtualTime::Zero());
   next_finish_time_ = next_finish_time;
   RTC_DCHECK(!absl::c_any_of(parent_.active_streams_,
diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h
index e76f474..9c523ed 100644
--- a/net/dcsctp/tx/stream_scheduler.h
+++ b/net/dcsctp/tx/stream_scheduler.h
@@ -10,6 +10,7 @@
 #ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_
 #define NET_DCSCTP_TX_STREAM_SCHEDULER_H_
 
+#include <algorithm>
 #include <cstdint>
 #include <deque>
 #include <map>
@@ -24,6 +25,8 @@
 #include "absl/strings/string_view.h"
 #include "absl/types/optional.h"
 #include "api/array_view.h"
+#include "net/dcsctp/packet/chunk/idata_chunk.h"
+#include "net/dcsctp/packet/sctp_packet.h"
 #include "net/dcsctp/public/dcsctp_message.h"
 #include "net/dcsctp/public/dcsctp_socket.h"
 #include "net/dcsctp/public/types.h"
@@ -42,9 +45,21 @@
 // with a "virtual finish time", which is the time when a stream is allowed to
 // produce data. Streams are ordered by their virtual finish time, and the
 // "current virtual time" will advance to the next following virtual finish time
-// whenever a chunk is to be produced. In the initial round-robin scheduling
-// algorithm, a stream's virtual finish time will just increment by one (1)
-// after having produced a chunk, which results in a round-robin scheduling.
+// whenever a chunk is to be produced.
+//
+// When message interleaving is enabled, the WFQ - Weighted Fair Queueing -
+// scheduling algorithm will be used. And when it's not, round-robin scheduling
+// will be used instead.
+//
+// In the round robin scheduling algorithm, a stream's virtual finish time will
+// just increment by one (1) after having produced a chunk, which results in a
+// round-robin scheduling.
+//
+// In WFQ scheduling algorithm, a stream's virtual finish time will be defined
+// as the number of bytes in the next fragment to be sent, multiplied by the
+// inverse of the stream's priority, meaning that a high priority - or a smaller
+// fragment - results in a closer virtual finish time, compared to a stream with
+// either a lower priority or a larger fragment to be sent.
 class StreamScheduler {
  private:
   class VirtualTime : public webrtc::StrongAlias<class VirtualTimeTag, double> {
@@ -54,6 +69,13 @@
 
     static constexpr VirtualTime Zero() { return VirtualTime(0); }
   };
+  class InverseWeight
+      : public webrtc::StrongAlias<class InverseWeightTag, double> {
+   public:
+    constexpr explicit InverseWeight(StreamPriority priority)
+        : webrtc::StrongAlias<class InverseWeightTag, double>(
+              1.0 / std::max(static_cast<double>(*priority), 0.000001)) {}
+  };
 
  public:
   class StreamProducer {
@@ -79,7 +101,7 @@
     StreamID stream_id() const { return stream_id_; }
 
     StreamPriority priority() const { return priority_; }
-    void set_priority(StreamPriority priority);
+    void SetPriority(StreamPriority priority);
 
     // Will activate the stream _if_ it has any data to send. That is, if the
     // callback to `bytes_to_send_in_next_message` returns non-zero. If the
@@ -105,13 +127,14 @@
         : parent_(*parent),
           producer_(*producer),
           stream_id_(stream_id),
-          priority_(priority) {}
+          priority_(priority),
+          inverse_weight_(priority) {}
 
     // Produces a message from this stream. This will only be called on streams
     // that have data.
     absl::optional<SendQueue::DataToSend> Produce(TimeMs now, size_t max_size);
 
-    void MakeActive();
+    void MakeActive(size_t bytes_to_send_next);
     void ForceMarkInactive();
 
     VirtualTime current_time() const { return current_virtual_time_; }
@@ -120,24 +143,34 @@
       return producer_.bytes_to_send_in_next_message();
     }
 
-    // Returns the next virtual finish time for this stream.
-    VirtualTime GetNextFinishTime() const;
+    VirtualTime CalculateFinishTime(size_t bytes_to_send_next) const;
 
     StreamScheduler& parent_;
     StreamProducer& producer_;
     const StreamID stream_id_;
     StreamPriority priority_;
+    InverseWeight inverse_weight_;
     // This outgoing stream's "current" virtual_time.
     VirtualTime current_virtual_time_ = VirtualTime::Zero();
     VirtualTime next_finish_time_ = VirtualTime::Zero();
   };
 
+  // The `mtu` parameter represents the maximum SCTP packet size, which should
+  // be the same as `DcSctpOptions::mtu`.
+  explicit StreamScheduler(size_t mtu)
+      : max_payload_bytes_(mtu - SctpPacket::kHeaderSize -
+                           IDataChunk::kHeaderSize) {}
+
   std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
                                        StreamID stream_id,
                                        StreamPriority priority) {
     return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
   }
 
+  void EnableMessageInterleaving(bool enabled) {
+    enable_message_interleaving_ = enabled;
+  }
+
   // Makes the scheduler stop producing message from the current stream and
   // re-evaluates which stream to produce from.
   void ForceReschedule() { currently_sending_a_message_ = false; }
@@ -165,14 +198,19 @@
 
   bool IsConsistent() const;
 
+  const size_t max_payload_bytes_;
+
   // The current virtual time, as defined in the WFQ algorithm.
   VirtualTime virtual_time_ = VirtualTime::Zero();
 
   // The current stream to send chunks from.
   Stream* current_stream_ = nullptr;
 
+  bool enable_message_interleaving_ = false;
+
   // Indicates if the streams is currently sending a message, and should then
-  // continue sending from this stream until that message has been sent in full.
+  // - if message interleaving is not enabled - continue sending from this
+  // stream until that message has been sent in full.
   bool currently_sending_a_message_ = false;
 
   // The currently active streams, ordered by virtual finish time.
diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc
index 0c239fe..58f0bc4 100644
--- a/net/dcsctp/tx/stream_scheduler_test.cc
+++ b/net/dcsctp/tx/stream_scheduler_test.cc
@@ -11,6 +11,7 @@
 
 #include <vector>
 
+#include "net/dcsctp/packet/sctp_packet.h"
 #include "net/dcsctp/public/types.h"
 #include "test/gmock.h"
 
@@ -91,14 +92,14 @@
 
 // A scheduler without active streams doesn't produce data.
 TEST(StreamSchedulerTest, HasNoActiveStreams) {
-  StreamScheduler scheduler;
+  StreamScheduler scheduler(kMtu);
 
   EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
 }
 
 // Stream properties can be set and retrieved
 TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
-  StreamScheduler scheduler;
+  StreamScheduler scheduler(kMtu);
 
   StrictMock<MockStreamProducer> producer;
   auto stream =
@@ -107,13 +108,13 @@
   EXPECT_EQ(stream->stream_id(), StreamID(1));
   EXPECT_EQ(stream->priority(), StreamPriority(2));
 
-  stream->set_priority(StreamPriority(0));
+  stream->SetPriority(StreamPriority(0));
   EXPECT_EQ(stream->priority(), StreamPriority(0));
 }
 
 // A scheduler with a single stream produced packets from it.
 TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
-  StreamScheduler scheduler;
+  StreamScheduler scheduler(kMtu);
 
   StrictMock<MockStreamProducer> producer;
   EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
@@ -130,7 +131,7 @@
 
 // Switches between two streams after every packet.
 TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
-  StreamScheduler scheduler;
+  StreamScheduler scheduler(kMtu);
 
   StrictMock<MockStreamProducer> producer1;
   EXPECT_CALL(producer1, Produce)
@@ -172,7 +173,7 @@
 // Switches between two streams after every packet, but keeps producing from the
 // same stream when a packet contains of multiple fragments.
 TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
-  StreamScheduler scheduler;
+  StreamScheduler scheduler(kMtu);
 
   StrictMock<MockStreamProducer> producer1;
   EXPECT_CALL(producer1, Produce)
@@ -234,7 +235,7 @@
 
 // Deactivates a stream before it has finished producing all packets.
 TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
-  StreamScheduler scheduler;
+  StreamScheduler scheduler(kMtu);
 
   StrictMock<MockStreamProducer> producer1;
   EXPECT_CALL(producer1, Produce)
@@ -258,7 +259,7 @@
 
 // Resumes a paused stream - makes a stream active after inactivating it.
 TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
-  StreamScheduler scheduler;
+  StreamScheduler scheduler(kMtu);
 
   StrictMock<MockStreamProducer> producer1;
   // Callbacks are setup so that they hint that there is a MID(2) coming...
@@ -288,7 +289,7 @@
 
 // Iterates between streams, where one is suddenly paused and later resumed.
 TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
-  StreamScheduler scheduler;
+  StreamScheduler scheduler(kMtu);
 
   StrictMock<MockStreamProducer> producer1;
   EXPECT_CALL(producer1, Produce)
@@ -332,7 +333,7 @@
 
 // Verifies that packet counts are evenly distributed in round robin scheduling.
 TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) {
-  StreamScheduler scheduler;
+  StreamScheduler scheduler(kMtu);
   TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
   TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
 
@@ -345,7 +346,7 @@
 // where a stream is suddenly made inactive, two are added, and then the paused
 // stream is resumed.
 TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) {
-  StreamScheduler scheduler;
+  StreamScheduler scheduler(kMtu);
   TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
   TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
 
@@ -373,5 +374,367 @@
   EXPECT_EQ(counts3[StreamID(4)], 5U);
 }
 
+// Degrades to fair queuing with streams having identical priority.
+TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) {
+  StreamScheduler scheduler(kMtu);
+  scheduler.EnableMessageInterleaving(true);
+
+  constexpr size_t kSmallPacket = 30;
+  constexpr size_t kLargePacket = 70;
+
+  StrictMock<MockStreamProducer> callback1;
+  EXPECT_CALL(callback1, Produce)
+      .WillOnce(CreateChunk(StreamID(1), MID(100), kSmallPacket))
+      .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
+      .WillOnce(CreateChunk(StreamID(1), MID(102), kSmallPacket));
+  EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+      .WillOnce(Return(kSmallPacket))  // When making active
+      .WillOnce(Return(kSmallPacket))
+      .WillOnce(Return(kSmallPacket))
+      .WillOnce(Return(0));
+  auto stream1 =
+      scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+  stream1->MaybeMakeActive();
+
+  StrictMock<MockStreamProducer> callback2;
+  EXPECT_CALL(callback2, Produce)
+      .WillOnce(CreateChunk(StreamID(2), MID(200), kLargePacket))
+      .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
+      .WillOnce(CreateChunk(StreamID(2), MID(202), kLargePacket));
+  EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+      .WillOnce(Return(kLargePacket))  // When making active
+      .WillOnce(Return(kLargePacket))
+      .WillOnce(Return(kLargePacket))
+      .WillOnce(Return(0));
+  auto stream2 =
+      scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
+  stream2->MaybeMakeActive();
+
+  // t = 30
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+  // t = 60
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+  // t = 70
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+  // t = 90
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+  // t = 140
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+  // t = 210
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+  EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Will do weighted fair queuing with three streams having different priority.
+TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
+  StreamScheduler scheduler(kMtu);
+  scheduler.EnableMessageInterleaving(true);
+
+  StrictMock<MockStreamProducer> callback1;
+  EXPECT_CALL(callback1, Produce)
+      .WillOnce(CreateChunk(StreamID(1), MID(100)))
+      .WillOnce(CreateChunk(StreamID(1), MID(101)))
+      .WillOnce(CreateChunk(StreamID(1), MID(102)));
+  EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+      .WillOnce(Return(kPayloadSize))  // When making active
+      .WillOnce(Return(kPayloadSize))
+      .WillOnce(Return(kPayloadSize))
+      .WillOnce(Return(0));
+  // Priority 125 -> allowed to produce every 1000/125 ~= 80 time units.
+  auto stream1 =
+      scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
+  stream1->MaybeMakeActive();
+
+  StrictMock<MockStreamProducer> callback2;
+  EXPECT_CALL(callback2, Produce)
+      .WillOnce(CreateChunk(StreamID(2), MID(200)))
+      .WillOnce(CreateChunk(StreamID(2), MID(201)))
+      .WillOnce(CreateChunk(StreamID(2), MID(202)));
+  EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+      .WillOnce(Return(kPayloadSize))  // When making active
+      .WillOnce(Return(kPayloadSize))
+      .WillOnce(Return(kPayloadSize))
+      .WillOnce(Return(0));
+  // Priority 200 -> allowed to produce every 1000/200 ~= 50 time units.
+  auto stream2 =
+      scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
+  stream2->MaybeMakeActive();
+
+  StrictMock<MockStreamProducer> callback3;
+  EXPECT_CALL(callback3, Produce)
+      .WillOnce(CreateChunk(StreamID(3), MID(300)))
+      .WillOnce(CreateChunk(StreamID(3), MID(301)))
+      .WillOnce(CreateChunk(StreamID(3), MID(302)));
+  EXPECT_CALL(callback3, bytes_to_send_in_next_message)
+      .WillOnce(Return(kPayloadSize))  // When making active
+      .WillOnce(Return(kPayloadSize))
+      .WillOnce(Return(kPayloadSize))
+      .WillOnce(Return(0));
+  // Priority 500 -> allowed to produce every 1000/500 ~= 20 time units.
+  auto stream3 =
+      scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
+  stream3->MaybeMakeActive();
+
+  // t ~= 20
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
+  // t ~= 40
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
+  // t ~= 50
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+  // t ~= 60
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
+  // t ~= 80
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+  // t ~= 100
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+  // t ~= 150
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+  // t ~= 160
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+  // t ~= 240
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+  EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Will do weighted fair queuing with three streams having different priority
+// and sending different payload sizes.
+TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
+  StreamScheduler scheduler(kMtu);
+  scheduler.EnableMessageInterleaving(true);
+
+  constexpr size_t kSmallPacket = 20;
+  constexpr size_t kMediumPacket = 50;
+  constexpr size_t kLargePacket = 70;
+
+  // Stream with priority = 125 -> inverse weight ~=80
+  StrictMock<MockStreamProducer> callback1;
+  EXPECT_CALL(callback1, Produce)
+      // virtual finish time ~ 0 + 50 * 80 = 4000
+      .WillOnce(CreateChunk(StreamID(1), MID(100), kMediumPacket))
+      // virtual finish time ~ 4000 + 20 * 80 = 5600
+      .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
+      // virtual finish time ~ 5600 + 70 * 80 = 11200
+      .WillOnce(CreateChunk(StreamID(1), MID(102), kLargePacket));
+  EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+      .WillOnce(Return(kMediumPacket))  // When making active
+      .WillOnce(Return(kSmallPacket))
+      .WillOnce(Return(kLargePacket))
+      .WillOnce(Return(0));
+  auto stream1 =
+      scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
+  stream1->MaybeMakeActive();
+
+  // Stream with priority = 200 -> inverse weight ~=50
+  StrictMock<MockStreamProducer> callback2;
+  EXPECT_CALL(callback2, Produce)
+      // virtual finish time ~ 0 + 50 * 50 = 2500
+      .WillOnce(CreateChunk(StreamID(2), MID(200), kMediumPacket))
+      // virtual finish time ~ 2500 + 70 * 50 = 6000
+      .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
+      // virtual finish time ~ 6000 + 20 * 50 = 7000
+      .WillOnce(CreateChunk(StreamID(2), MID(202), kSmallPacket));
+  EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+      .WillOnce(Return(kMediumPacket))  // When making active
+      .WillOnce(Return(kLargePacket))
+      .WillOnce(Return(kSmallPacket))
+      .WillOnce(Return(0));
+  auto stream2 =
+      scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
+  stream2->MaybeMakeActive();
+
+  // Stream with priority = 500 -> inverse weight ~=20
+  StrictMock<MockStreamProducer> callback3;
+  EXPECT_CALL(callback3, Produce)
+      // virtual finish time ~ 0 + 20 * 20 = 400
+      .WillOnce(CreateChunk(StreamID(3), MID(300), kSmallPacket))
+      // virtual finish time ~ 400 + 50 * 20 = 1400
+      .WillOnce(CreateChunk(StreamID(3), MID(301), kMediumPacket))
+      // virtual finish time ~ 1400 + 70 * 20 = 2800
+      .WillOnce(CreateChunk(StreamID(3), MID(302), kLargePacket));
+  EXPECT_CALL(callback3, bytes_to_send_in_next_message)
+      .WillOnce(Return(kSmallPacket))  // When making active
+      .WillOnce(Return(kMediumPacket))
+      .WillOnce(Return(kLargePacket))
+      .WillOnce(Return(0));
+  auto stream3 =
+      scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
+  stream3->MaybeMakeActive();
+
+  // t ~= 400
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
+  // t ~= 1400
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
+  // t ~= 2500
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
+  // t ~= 2800
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
+  // t ~= 4000
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
+  // t ~= 5600
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
+  // t ~= 6000
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
+  // t ~= 7000
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
+  // t ~= 11200
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
+  EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) {
+  // A simple test with two streams of different priority, but sending packets
+  // of identical size. Verifies that the ratio of sent packets represent their
+  // priority.
+  StreamScheduler scheduler(kMtu);
+  scheduler.EnableMessageInterleaving(true);
+
+  TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
+  TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
+
+  std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 15);
+  EXPECT_EQ(packet_counts[StreamID(1)], 5U);
+  EXPECT_EQ(packet_counts[StreamID(2)], 10U);
+}
+
+TEST(StreamSchedulerTest, WillDistributeWFQPacketsInFourStreamsByPriority) {
+  // Same as `WillDistributeWFQPacketsInTwoStreamsByPriority` but with more
+  // streams.
+  StreamScheduler scheduler(kMtu);
+  scheduler.EnableMessageInterleaving(true);
+
+  TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
+  TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
+  TestStream stream3(scheduler, StreamID(3), StreamPriority(300), kPayloadSize);
+  TestStream stream4(scheduler, StreamID(4), StreamPriority(400), kPayloadSize);
+
+  std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 50);
+  EXPECT_EQ(packet_counts[StreamID(1)], 5U);
+  EXPECT_EQ(packet_counts[StreamID(2)], 10U);
+  EXPECT_EQ(packet_counts[StreamID(3)], 15U);
+  EXPECT_EQ(packet_counts[StreamID(4)], 20U);
+}
+
+TEST(StreamSchedulerTest, WillDistributeFromTwoStreamsFairly) {
+  // A simple test with two streams of different priority, but sending packets
+  // of different size. Verifies that the ratio of total packet payload
+  // represent their priority.
+  // In this example,
+  // * stream1 has priority 100 and sends packets of size 8
+  // * stream2 has priority 400 and sends packets of size 4
+  // With round robin, stream1 would get twice as many payload bytes on the wire
+  // as stream2, but with WFQ and a 4x priority increase, stream2 should 4x as
+  // many payload bytes on the wire. That translates to stream2 getting 8x as
+  // many packets on the wire as they are half as large.
+  StreamScheduler scheduler(kMtu);
+  // Enable WFQ scheduler.
+  scheduler.EnableMessageInterleaving(true);
+
+  TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
+                     /*packet_size=*/8);
+  TestStream stream2(scheduler, StreamID(2), StreamPriority(400),
+                     /*packet_size=*/4);
+
+  std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 90);
+  EXPECT_EQ(packet_counts[StreamID(1)], 10U);
+  EXPECT_EQ(packet_counts[StreamID(2)], 80U);
+}
+
+TEST(StreamSchedulerTest, WillDistributeFromFourStreamsFairly) {
+  // Same as `WillDistributeWeightedFairFromTwoStreamsFairly` but more
+  // complicated.
+  StreamScheduler scheduler(kMtu);
+  // Enable WFQ scheduler.
+  scheduler.EnableMessageInterleaving(true);
+
+  TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
+                     /*packet_size=*/10);
+  TestStream stream2(scheduler, StreamID(2), StreamPriority(200),
+                     /*packet_size=*/10);
+  TestStream stream3(scheduler, StreamID(3), StreamPriority(200),
+                     /*packet_size=*/20);
+  TestStream stream4(scheduler, StreamID(4), StreamPriority(400),
+                     /*packet_size=*/30);
+
+  std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 80);
+  // 15 packets * 10 bytes = 150 bytes at priority 100.
+  EXPECT_EQ(packet_counts[StreamID(1)], 15U);
+  // 30 packets * 10 bytes = 300 bytes at priority 200.
+  EXPECT_EQ(packet_counts[StreamID(2)], 30U);
+  // 15 packets * 20 bytes = 300 bytes at priority 200.
+  EXPECT_EQ(packet_counts[StreamID(3)], 15U);
+  // 20 packets * 30 bytes = 600 bytes at priority 400.
+  EXPECT_EQ(packet_counts[StreamID(4)], 20U);
+}
+
+// Sending large messages with small MTU will fragment the messages and produce
+// a first fragment not larger than the MTU, and will then not first send from
+// the stream with the smallest message, as their first fragment will be equally
+// small for both streams. See `LargeMessageWithLargeMtu` for the same test, but
+// with a larger MTU.
+TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) {
+  StreamScheduler scheduler(100 + SctpPacket::kHeaderSize +
+                            IDataChunk::kHeaderSize);
+  scheduler.EnableMessageInterleaving(true);
+
+  StrictMock<MockStreamProducer> producer1;
+  EXPECT_CALL(producer1, Produce)
+      .WillOnce(CreateChunk(StreamID(1), MID(0), 100))
+      .WillOnce(CreateChunk(StreamID(1), MID(0), 100));
+  EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+      .WillOnce(Return(200))  // When making active
+      .WillOnce(Return(100))
+      .WillOnce(Return(0));
+  auto stream1 =
+      scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
+  stream1->MaybeMakeActive();
+
+  StrictMock<MockStreamProducer> producer2;
+  EXPECT_CALL(producer2, Produce)
+      .WillOnce(CreateChunk(StreamID(2), MID(1), 100))
+      .WillOnce(CreateChunk(StreamID(2), MID(1), 50));
+  EXPECT_CALL(producer2, bytes_to_send_in_next_message)
+      .WillOnce(Return(150))  // When making active
+      .WillOnce(Return(50))
+      .WillOnce(Return(0));
+  auto stream2 =
+      scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
+  stream2->MaybeMakeActive();
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
+  EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
+// Sending large messages with large MTU will not fragment messages and will
+// send the message first from the stream that has the smallest message.
+TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) {
+  StreamScheduler scheduler(200 + SctpPacket::kHeaderSize +
+                            IDataChunk::kHeaderSize);
+  scheduler.EnableMessageInterleaving(true);
+
+  StrictMock<MockStreamProducer> producer1;
+  EXPECT_CALL(producer1, Produce)
+      .WillOnce(CreateChunk(StreamID(1), MID(0), 200));
+  EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+      .WillOnce(Return(200))  // When making active
+      .WillOnce(Return(0));
+  auto stream1 =
+      scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
+  stream1->MaybeMakeActive();
+
+  StrictMock<MockStreamProducer> producer2;
+  EXPECT_CALL(producer2, Produce)
+      .WillOnce(CreateChunk(StreamID(2), MID(1), 150));
+  EXPECT_CALL(producer2, bytes_to_send_in_next_message)
+      .WillOnce(Return(150))  // When making active
+      .WillOnce(Return(0));
+  auto stream2 =
+      scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
+  stream2->MaybeMakeActive();
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
+  EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
+  EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
+}
+
 }  // namespace
 }  // namespace dcsctp