dcsctp: Use stream scheduler in send queue

Changing the currently embedded scheduler that was implemented using a
revolving pointer, to the parameterized stream scheduler that is
implemented using a "virtual finish time" approach.

Also renamed StreamCallback to StreamProducer, per review comments.

Bug: webrtc:5696
Change-Id: I7719678776ddbe05b688ada1b52887e5ca2fb206
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/262160
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37170}
diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn
index c2fb12e..ae81db8 100644
--- a/net/dcsctp/tx/BUILD.gn
+++ b/net/dcsctp/tx/BUILD.gn
@@ -14,6 +14,7 @@
     "../common:internal_types",
     "../packet:chunk",
     "../packet:data",
+    "../public:socket",
     "../public:types",
   ]
   sources = [ "send_queue.h" ]
@@ -23,9 +24,12 @@
 rtc_library("rr_send_queue") {
   deps = [
     ":send_queue",
+    ":stream_scheduler",
     "../../../api:array_view",
     "../../../rtc_base:checks",
     "../../../rtc_base:logging",
+    "../../../rtc_base/containers:flat_map",
+    "../common:str_join",
     "../packet:data",
     "../public:socket",
     "../public:types",
@@ -180,6 +184,7 @@
       "../common:sequence_numbers",
       "../packet:chunk",
       "../packet:data",
+      "../packet:sctp_packet",
       "../public:socket",
       "../public:types",
       "../testing:data_generator",
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 6127da4..09323d2 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -13,12 +13,14 @@
 #include <deque>
 #include <limits>
 #include <map>
+#include <set>
 #include <utility>
 #include <vector>
 
 #include "absl/algorithm/container.h"
 #include "absl/types/optional.h"
 #include "api/array_view.h"
+#include "net/dcsctp/common/str_join.h"
 #include "net/dcsctp/packet/data.h"
 #include "net/dcsctp/public/dcsctp_message.h"
 #include "net/dcsctp/public/dcsctp_socket.h"
@@ -42,18 +44,18 @@
   total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
 }
 
-bool RRSendQueue::OutgoingStream::HasDataToSend() const {
+size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const {
   if (pause_state_ == PauseState::kPaused ||
       pause_state_ == PauseState::kResetting) {
     // The stream has paused (and there is no partially sent message).
-    return false;
+    return 0;
   }
 
   if (items_.empty()) {
-    return false;
+    return 0;
   }
 
-  return true;
+  return items_.front().remaining_size;
 }
 
 void RRSendQueue::OutgoingStream::AddHandoverState(
@@ -61,29 +63,30 @@
   state.next_ssn = next_ssn_.value();
   state.next_ordered_mid = next_ordered_mid_.value();
   state.next_unordered_mid = next_unordered_mid_.value();
-  state.priority = *priority_;
+  state.priority = *scheduler_stream_->priority();
 }
 
 bool RRSendQueue::IsConsistent() const {
-  size_t total_buffered_amount = 0;
-  for (const auto& [unused, stream] : streams_) {
-    total_buffered_amount += stream.buffered_amount().value();
-  }
+  std::set<StreamID> expected_active_streams;
+  std::set<StreamID> actual_active_streams;
 
-  if (previous_message_has_ended_) {
-    auto it = streams_.find(current_stream_id_);
-    if (it != streams_.end() && it->second.has_partially_sent_message()) {
-      RTC_DLOG(LS_ERROR)
-          << "Previous message has ended, but still partial message in stream";
-      return false;
+  size_t total_buffered_amount = 0;
+  for (const auto& [stream_id, stream] : streams_) {
+    total_buffered_amount += stream.buffered_amount().value();
+    if (stream.bytes_to_send_in_next_message() > 0) {
+      expected_active_streams.emplace(stream_id);
     }
-  } else {
-    auto it = streams_.find(current_stream_id_);
-    if (it == streams_.end() || !it->second.has_partially_sent_message()) {
-      RTC_DLOG(LS_ERROR)
-          << "Previous message has NOT ended, but there is no partial message";
-      return false;
-    }
+  }
+  for (const auto& stream : scheduler_.ActiveStreamsForTesting()) {
+    actual_active_streams.emplace(stream->stream_id());
+  }
+  if (expected_active_streams != actual_active_streams) {
+    auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; };
+    RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=["
+                       << StrJoin(actual_active_streams, ",", fn)
+                       << "], expected=["
+                       << StrJoin(expected_active_streams, ",", fn) << "]";
+    return false;
   }
 
   return total_buffered_amount == total_buffered_amount_.value();
@@ -118,10 +121,15 @@
 void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
                                       TimeMs expires_at,
                                       const SendOptions& send_options) {
+  bool was_active = bytes_to_send_in_next_message() > 0;
   buffered_amount_.Increase(message.payload().size());
   total_buffered_amount_.Increase(message.payload().size());
   items_.emplace_back(std::move(message), expires_at, send_options);
 
+  if (!was_active) {
+    scheduler_stream_->MaybeMakeActive();
+  }
+
   RTC_DCHECK(IsConsistent());
 }
 
@@ -227,8 +235,15 @@
       total_buffered_amount_.Decrease(item.remaining_size);
       items_.pop_front();
 
+      // Only partially sent messages are discarded, so if a message was
+      // discarded, then it was the currently sent message.
+      scheduler_stream_->ForceReschedule();
+
       if (pause_state_ == PauseState::kPending) {
         pause_state_ = PauseState::kPaused;
+        scheduler_stream_->MakeInactive();
+      } else if (bytes_to_send_in_next_message() == 0) {
+        scheduler_stream_->MakeInactive();
       }
 
       // As the item still existed, it had unsent data.
@@ -277,6 +292,7 @@
   if (had_pending_items && pause_state_ == PauseState::kPaused) {
     RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
                          << " was previously active, but is now paused.";
+    scheduler_stream_->MakeInactive();
   }
 
   RTC_DCHECK(IsConsistent());
@@ -284,11 +300,8 @@
 
 void RRSendQueue::OutgoingStream::Resume() {
   RTC_DCHECK(pause_state_ == PauseState::kResetting);
-  if (!items_.empty()) {
-    RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
-                         << " was previously paused, but is now active.";
-  }
   pause_state_ = PauseState::kNotPaused;
+  scheduler_stream_->MaybeMakeActive();
   RTC_DCHECK(IsConsistent());
 }
 
@@ -296,6 +309,11 @@
   // This can be called both when an outgoing stream reset has been responded
   // to, or when the entire SendQueue is reset due to detecting the peer having
   // restarted. The stream may be in any state at this time.
+  PauseState old_pause_state = pause_state_;
+  pause_state_ = PauseState::kNotPaused;
+  next_ordered_mid_ = MID(0);
+  next_unordered_mid_ = MID(0);
+  next_ssn_ = SSN(0);
   if (!items_.empty()) {
     // If this message has been partially sent, reset it so that it will be
     // re-sent.
@@ -309,11 +327,11 @@
     item.message_id = absl::nullopt;
     item.ssn = absl::nullopt;
     item.current_fsn = FSN(0);
+    if (old_pause_state == PauseState::kPaused ||
+        old_pause_state == PauseState::kResetting) {
+      scheduler_stream_->MaybeMakeActive();
+    }
   }
-  pause_state_ = PauseState::kNotPaused;
-  next_ordered_mid_ = MID(0);
-  next_unordered_mid_ = MID(0);
-  next_ssn_ = SSN(0);
   RTC_DCHECK(IsConsistent());
 }
 
@@ -350,67 +368,9 @@
   return total_buffered_amount() == 0;
 }
 
-std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
-RRSendQueue::GetNextStream() {
-  auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
-
-  for (auto it = start_it; it != streams_.end(); ++it) {
-    if (it->second.HasDataToSend()) {
-      current_stream_id_ = it->first;
-      return it;
-    }
-  }
-
-  for (auto it = streams_.begin(); it != start_it; ++it) {
-    if (it->second.HasDataToSend()) {
-      current_stream_id_ = it->first;
-      return it;
-    }
-  }
-  return streams_.end();
-}
-
 absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
                                                            size_t max_size) {
-  std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
-
-  for (;;) {
-    if (previous_message_has_ended_) {
-      // Previous message has ended. Round-robin to a different stream, if there
-      // even is one with data to send.
-      stream_it = GetNextStream();
-      if (stream_it == streams_.end()) {
-        RTC_DLOG(LS_VERBOSE)
-            << log_prefix_
-            << "There is no stream with data; Can't produce any data.";
-        return absl::nullopt;
-      }
-    } else {
-      // The previous message has not ended; Continue from the current stream.
-      stream_it = streams_.find(current_stream_id_);
-      RTC_DCHECK(stream_it != streams_.end());
-    }
-
-    absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
-    if (!data.has_value()) {
-      continue;
-    }
-    RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
-                         << (data->data.is_unordered ? "unordered" : "ordered")
-                         << "::"
-                         << (*data->data.is_beginning && *data->data.is_end
-                                 ? "complete"
-                             : *data->data.is_beginning ? "first"
-                             : *data->data.is_end       ? "last"
-                                                        : "middle")
-                         << ", stream_id=" << *stream_it->first
-                         << ", ppid=" << *data->data.ppid
-                         << ", length=" << data->data.payload.size();
-
-    previous_message_has_ended_ = *data->data.is_end;
-    RTC_DCHECK(IsConsistent());
-    return data;
-  }
+  return scheduler_.Produce(now, max_size);
 }
 
 bool RRSendQueue::Discard(IsUnordered unordered,
@@ -418,12 +378,8 @@
                           MID message_id) {
   bool has_discarded =
       GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
-  if (has_discarded) {
-    // Only partially sent messages are discarded, so if a message was
-    // discarded, then it was the currently sent message.
-    previous_message_has_ended_ = true;
-  }
 
+  RTC_DCHECK(IsConsistent());
   return has_discarded;
 }
 
@@ -484,7 +440,7 @@
   for (auto& [unused, stream] : streams_) {
     stream.Reset();
   }
-  previous_message_has_ended_ = true;
+  scheduler_.ForceReschedule();
 }
 
 size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
@@ -516,9 +472,9 @@
   }
 
   return streams_
-      .emplace(stream_id,
-               OutgoingStream(
-                   stream_id, default_priority_,
+      .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
+               std::forward_as_tuple(
+                   &scheduler_, stream_id, default_priority_,
                    [this, stream_id]() { on_buffered_amount_low_(stream_id); },
                    total_buffered_amount_))
       .first->second;
@@ -562,9 +518,9 @@
        state.tx.streams) {
     StreamID stream_id(state_stream.id);
     streams_.emplace(
-        stream_id,
-        OutgoingStream(
-            stream_id, StreamPriority(state_stream.priority),
+        std::piecewise_construct, std::forward_as_tuple(stream_id),
+        std::forward_as_tuple(
+            &scheduler_, stream_id, StreamPriority(state_stream.priority),
             [this, stream_id]() { on_buffered_amount_low_(stream_id); },
             total_buffered_amount_, &state_stream));
   }
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index 59f0d91..eea814c 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -13,6 +13,7 @@
 #include <cstdint>
 #include <deque>
 #include <map>
+#include <memory>
 #include <string>
 #include <utility>
 #include <vector>
@@ -25,6 +26,7 @@
 #include "net/dcsctp/public/dcsctp_socket.h"
 #include "net/dcsctp/public/types.h"
 #include "net/dcsctp/tx/send_queue.h"
+#include "net/dcsctp/tx/stream_scheduler.h"
 
 namespace dcsctp {
 
@@ -111,32 +113,33 @@
   };
 
   // Per-stream information.
-  class OutgoingStream {
+  class OutgoingStream : public StreamScheduler::StreamProducer {
    public:
     OutgoingStream(
+        StreamScheduler* scheduler,
         StreamID stream_id,
         StreamPriority priority,
         std::function<void()> on_buffered_amount_low,
         ThresholdWatcher& total_buffered_amount,
         const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
-        : stream_id_(stream_id),
-          priority_(priority),
+        : scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
           next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
           next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
           next_ssn_(SSN(state ? state->next_ssn : 0)),
           buffered_amount_(std::move(on_buffered_amount_low)),
           total_buffered_amount_(total_buffered_amount) {}
 
-    StreamID stream_id() const { return stream_id_; }
+    StreamID stream_id() const { return scheduler_stream_->stream_id(); }
 
     // Enqueues a message to this stream.
     void Add(DcSctpMessage message,
              TimeMs expires_at,
              const SendOptions& send_options);
 
-    // Produces a data chunk to send, or `absl::nullopt` if nothing could be
-    // produced, e.g. if all messages have expired.
-    absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
+    // Implementing `StreamScheduler::StreamProducer`.
+    absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
+                                                  size_t max_size) override;
+    size_t bytes_to_send_in_next_message() const override;
 
     const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
     ThresholdWatcher& buffered_amount() { return buffered_amount_; }
@@ -167,12 +170,10 @@
     // Indicates if this stream has a partially sent message in it.
     bool has_partially_sent_message() const;
 
-    // Indicates if the stream possibly has data to send. Note that it may
-    // return `true` for streams that have enqueued, but expired, messages.
-    bool HasDataToSend() const;
-
-    void set_priority(StreamPriority priority) { priority_ = priority; }
-    StreamPriority priority() const { return priority_; }
+    StreamPriority priority() const { return scheduler_stream_->priority(); }
+    void set_priority(StreamPriority priority) {
+      scheduler_stream_->set_priority(priority);
+    }
 
     void AddHandoverState(
         DcSctpSocketHandoverState::OutgoingStream& state) const;
@@ -225,8 +226,8 @@
 
     bool IsConsistent() const;
 
-    const StreamID stream_id_;
-    StreamPriority priority_;
+    const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
+
     PauseState pause_state_ = PauseState::kNotPaused;
     // MIDs are different for unordered and ordered messages sent on a stream.
     MID next_unordered_mid_;
@@ -251,12 +252,10 @@
       TimeMs now,
       size_t max_size);
 
-  // Return the next stream, in round-robin fashion.
-  std::map<StreamID, OutgoingStream>::iterator GetNextStream();
-
   const std::string log_prefix_;
   const size_t buffer_size_;
   const StreamPriority default_priority_;
+  StreamScheduler scheduler_;
 
   // Called when the buffered amount is below what has been set using
   // `SetBufferedAmountLowThreshold`.
@@ -269,15 +268,6 @@
   // The total amount of buffer data, for all streams.
   ThresholdWatcher total_buffered_amount_;
 
-  // Indicates if the previous fragment sent was the end of a message. For
-  // non-interleaved sending, this means that the next message may come from a
-  // different stream. If not true, the next fragment must be produced from the
-  // same stream as last time.
-  bool previous_message_has_ended_ = true;
-
-  // The current stream to send chunks from. Modified by `GetNextStream`.
-  StreamID current_stream_id_ = StreamID(0);
-
   // All streams, and messages added to those.
   std::map<StreamID, OutgoingStream> streams_;
 };
diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc
index 22457bf..2056dd1 100644
--- a/net/dcsctp/tx/stream_scheduler.cc
+++ b/net/dcsctp/tx/stream_scheduler.cc
@@ -114,7 +114,7 @@
 absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
     TimeMs now,
     size_t max_size) {
-  absl::optional<SendQueue::DataToSend> data = callback_.Produce(now, max_size);
+  absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
 
   if (data.has_value()) {
     VirtualTime new_current = GetNextFinishTime();
diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h
index 9feeb61..d82501f 100644
--- a/net/dcsctp/tx/stream_scheduler.h
+++ b/net/dcsctp/tx/stream_scheduler.h
@@ -56,9 +56,9 @@
   };
 
  public:
-  class StreamCallback {
+  class StreamProducer {
    public:
-    virtual ~StreamCallback() = default;
+    virtual ~StreamProducer() = default;
 
     // Produces a fragment of data to send. The current wall time is specified
     // as `now` and should be used to skip chunks with expired limited lifetime.
@@ -99,11 +99,11 @@
     friend class StreamScheduler;
 
     Stream(StreamScheduler* parent,
-           StreamCallback* callback,
+           StreamProducer* producer,
            StreamID stream_id,
            StreamPriority priority)
         : parent_(*parent),
-          callback_(*callback),
+          producer_(*producer),
           stream_id_(stream_id),
           priority_(priority) {}
 
@@ -117,14 +117,14 @@
     VirtualTime current_time() const { return current_virtual_time_; }
     VirtualTime next_finish_time() const { return next_finish_time_; }
     size_t bytes_to_send_in_next_message() const {
-      return callback_.bytes_to_send_in_next_message();
+      return producer_.bytes_to_send_in_next_message();
     }
 
     // Returns the next virtual finish time for this stream.
     VirtualTime GetNextFinishTime() const;
 
     StreamScheduler& parent_;
-    StreamCallback& callback_;
+    StreamProducer& producer_;
     const StreamID stream_id_;
     StreamPriority priority_;
     // This outgoing stream's "current" virtual_time.
@@ -132,10 +132,10 @@
     VirtualTime next_finish_time_ = VirtualTime::Zero();
   };
 
-  std::unique_ptr<Stream> CreateStream(StreamCallback* callback,
+  std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
                                        StreamID stream_id,
                                        StreamPriority priority) {
-    return absl::WrapUnique(new Stream(this, callback, stream_id, priority));
+    return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
   }
 
   // Makes the scheduler stop producing message from the current stream and
diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc
index cd15837..0c239fe 100644
--- a/net/dcsctp/tx/stream_scheduler_test.cc
+++ b/net/dcsctp/tx/stream_scheduler_test.cc
@@ -59,7 +59,7 @@
   return packet_counts;
 }
 
-class MockStreamCallback : public StreamScheduler::StreamCallback {
+class MockStreamProducer : public StreamScheduler::StreamProducer {
  public:
   MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
               Produce,
@@ -74,18 +74,18 @@
              StreamID stream_id,
              StreamPriority priority,
              size_t packet_size = kPayloadSize) {
-    EXPECT_CALL(callback_, Produce)
+    EXPECT_CALL(producer_, Produce)
         .WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
-    EXPECT_CALL(callback_, bytes_to_send_in_next_message)
+    EXPECT_CALL(producer_, bytes_to_send_in_next_message)
         .WillRepeatedly(Return(packet_size));
-    stream_ = scheduler.CreateStream(&callback_, stream_id, priority);
+    stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
     stream_->MaybeMakeActive();
   }
 
   StreamScheduler::Stream& stream() { return *stream_; }
 
  private:
-  StrictMock<MockStreamCallback> callback_;
+  StrictMock<MockStreamProducer> producer_;
   std::unique_ptr<StreamScheduler::Stream> stream_;
 };
 
@@ -100,9 +100,9 @@
 TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
   StreamScheduler scheduler;
 
-  StrictMock<MockStreamCallback> callback;
+  StrictMock<MockStreamProducer> producer;
   auto stream =
-      scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
+      scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
 
   EXPECT_EQ(stream->stream_id(), StreamID(1));
   EXPECT_EQ(stream->priority(), StreamPriority(2));
@@ -115,13 +115,13 @@
 TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
   StreamScheduler scheduler;
 
-  StrictMock<MockStreamCallback> callback;
-  EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
-  EXPECT_CALL(callback, bytes_to_send_in_next_message)
+  StrictMock<MockStreamProducer> producer;
+  EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
+  EXPECT_CALL(producer, bytes_to_send_in_next_message)
       .WillOnce(Return(kPayloadSize))  // When making active
       .WillOnce(Return(0));
   auto stream =
-      scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
+      scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
   stream->MaybeMakeActive();
 
   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
@@ -132,32 +132,32 @@
 TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
   StreamScheduler scheduler;
 
-  StrictMock<MockStreamCallback> callback1;
-  EXPECT_CALL(callback1, Produce)
+  StrictMock<MockStreamProducer> producer1;
+  EXPECT_CALL(producer1, Produce)
       .WillOnce(CreateChunk(StreamID(1), MID(100)))
       .WillOnce(CreateChunk(StreamID(1), MID(101)))
       .WillOnce(CreateChunk(StreamID(1), MID(102)));
-  EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+  EXPECT_CALL(producer1, bytes_to_send_in_next_message)
       .WillOnce(Return(kPayloadSize))  // When making active
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(0));
   auto stream1 =
-      scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+      scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
   stream1->MaybeMakeActive();
 
-  StrictMock<MockStreamCallback> callback2;
-  EXPECT_CALL(callback2, Produce)
+  StrictMock<MockStreamProducer> producer2;
+  EXPECT_CALL(producer2, Produce)
       .WillOnce(CreateChunk(StreamID(2), MID(200)))
       .WillOnce(CreateChunk(StreamID(2), MID(201)))
       .WillOnce(CreateChunk(StreamID(2), MID(202)));
-  EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+  EXPECT_CALL(producer2, bytes_to_send_in_next_message)
       .WillOnce(Return(kPayloadSize))  // When making active
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(0));
   auto stream2 =
-      scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
+      scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
   stream2->MaybeMakeActive();
 
   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -174,8 +174,8 @@
 TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
   StreamScheduler scheduler;
 
-  StrictMock<MockStreamCallback> callback1;
-  EXPECT_CALL(callback1, Produce)
+  StrictMock<MockStreamProducer> producer1;
+  EXPECT_CALL(producer1, Produce)
       .WillOnce(CreateChunk(StreamID(1), MID(100)))
       .WillOnce([](...) {
         return SendQueue::DataToSend(
@@ -196,7 +196,7 @@
                  Data::IsEnd(true), IsUnordered(true)));
       })
       .WillOnce(CreateChunk(StreamID(1), MID(102)));
-  EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+  EXPECT_CALL(producer1, bytes_to_send_in_next_message)
       .WillOnce(Return(kPayloadSize))  // When making active
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(kPayloadSize))
@@ -204,21 +204,21 @@
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(0));
   auto stream1 =
-      scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+      scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
   stream1->MaybeMakeActive();
 
-  StrictMock<MockStreamCallback> callback2;
-  EXPECT_CALL(callback2, Produce)
+  StrictMock<MockStreamProducer> producer2;
+  EXPECT_CALL(producer2, Produce)
       .WillOnce(CreateChunk(StreamID(2), MID(200)))
       .WillOnce(CreateChunk(StreamID(2), MID(201)))
       .WillOnce(CreateChunk(StreamID(2), MID(202)));
-  EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+  EXPECT_CALL(producer2, bytes_to_send_in_next_message)
       .WillOnce(Return(kPayloadSize))  // When making active
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(0));
   auto stream2 =
-      scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
+      scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
   stream2->MaybeMakeActive();
 
   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -236,16 +236,16 @@
 TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
   StreamScheduler scheduler;
 
-  StrictMock<MockStreamCallback> callback1;
-  EXPECT_CALL(callback1, Produce)
+  StrictMock<MockStreamProducer> producer1;
+  EXPECT_CALL(producer1, Produce)
       .WillOnce(CreateChunk(StreamID(1), MID(100)))
       .WillOnce(CreateChunk(StreamID(1), MID(101)));
-  EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+  EXPECT_CALL(producer1, bytes_to_send_in_next_message)
       .WillOnce(Return(kPayloadSize))  // When making active
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(kPayloadSize));  // hints that there is a MID(2) coming.
   auto stream1 =
-      scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+      scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
   stream1->MaybeMakeActive();
 
   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -260,20 +260,20 @@
 TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
   StreamScheduler scheduler;
 
-  StrictMock<MockStreamCallback> callback1;
+  StrictMock<MockStreamProducer> producer1;
   // Callbacks are setup so that they hint that there is a MID(2) coming...
-  EXPECT_CALL(callback1, Produce)
+  EXPECT_CALL(producer1, Produce)
       .WillOnce(CreateChunk(StreamID(1), MID(100)))
       .WillOnce(CreateChunk(StreamID(1), MID(101)))
       .WillOnce(CreateChunk(StreamID(1), MID(102)));
-  EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+  EXPECT_CALL(producer1, bytes_to_send_in_next_message)
       .WillOnce(Return(kPayloadSize))  // When making active
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(kPayloadSize))  // When making active again
       .WillOnce(Return(0));
   auto stream1 =
-      scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+      scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
   stream1->MaybeMakeActive();
 
   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -290,33 +290,33 @@
 TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
   StreamScheduler scheduler;
 
-  StrictMock<MockStreamCallback> callback1;
-  EXPECT_CALL(callback1, Produce)
+  StrictMock<MockStreamProducer> producer1;
+  EXPECT_CALL(producer1, Produce)
       .WillOnce(CreateChunk(StreamID(1), MID(100)))
       .WillOnce(CreateChunk(StreamID(1), MID(101)))
       .WillOnce(CreateChunk(StreamID(1), MID(102)));
-  EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+  EXPECT_CALL(producer1, bytes_to_send_in_next_message)
       .WillOnce(Return(kPayloadSize))  // When making active
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(kPayloadSize))  // When making active
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(0));
   auto stream1 =
-      scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+      scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
   stream1->MaybeMakeActive();
 
-  StrictMock<MockStreamCallback> callback2;
-  EXPECT_CALL(callback2, Produce)
+  StrictMock<MockStreamProducer> producer2;
+  EXPECT_CALL(producer2, Produce)
       .WillOnce(CreateChunk(StreamID(2), MID(200)))
       .WillOnce(CreateChunk(StreamID(2), MID(201)))
       .WillOnce(CreateChunk(StreamID(2), MID(202)));
-  EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+  EXPECT_CALL(producer2, bytes_to_send_in_next_message)
       .WillOnce(Return(kPayloadSize))  // When making active
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(kPayloadSize))
       .WillOnce(Return(0));
   auto stream2 =
-      scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
+      scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
   stream2->MaybeMakeActive();
 
   EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));