dcsctp: Use stream scheduler in send queue
Changing the currently embedded scheduler that was implemented using a
revolving pointer, to the parameterized stream scheduler that is
implemented using a "virtual finish time" approach.
Also renamed StreamCallback to StreamProducer, per review comments.
Bug: webrtc:5696
Change-Id: I7719678776ddbe05b688ada1b52887e5ca2fb206
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/262160
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37170}
diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn
index c2fb12e..ae81db8 100644
--- a/net/dcsctp/tx/BUILD.gn
+++ b/net/dcsctp/tx/BUILD.gn
@@ -14,6 +14,7 @@
"../common:internal_types",
"../packet:chunk",
"../packet:data",
+ "../public:socket",
"../public:types",
]
sources = [ "send_queue.h" ]
@@ -23,9 +24,12 @@
rtc_library("rr_send_queue") {
deps = [
":send_queue",
+ ":stream_scheduler",
"../../../api:array_view",
"../../../rtc_base:checks",
"../../../rtc_base:logging",
+ "../../../rtc_base/containers:flat_map",
+ "../common:str_join",
"../packet:data",
"../public:socket",
"../public:types",
@@ -180,6 +184,7 @@
"../common:sequence_numbers",
"../packet:chunk",
"../packet:data",
+ "../packet:sctp_packet",
"../public:socket",
"../public:types",
"../testing:data_generator",
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 6127da4..09323d2 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -13,12 +13,14 @@
#include <deque>
#include <limits>
#include <map>
+#include <set>
#include <utility>
#include <vector>
#include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
+#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
@@ -42,18 +44,18 @@
total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
}
-bool RRSendQueue::OutgoingStream::HasDataToSend() const {
+size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const {
if (pause_state_ == PauseState::kPaused ||
pause_state_ == PauseState::kResetting) {
// The stream has paused (and there is no partially sent message).
- return false;
+ return 0;
}
if (items_.empty()) {
- return false;
+ return 0;
}
- return true;
+ return items_.front().remaining_size;
}
void RRSendQueue::OutgoingStream::AddHandoverState(
@@ -61,29 +63,30 @@
state.next_ssn = next_ssn_.value();
state.next_ordered_mid = next_ordered_mid_.value();
state.next_unordered_mid = next_unordered_mid_.value();
- state.priority = *priority_;
+ state.priority = *scheduler_stream_->priority();
}
bool RRSendQueue::IsConsistent() const {
- size_t total_buffered_amount = 0;
- for (const auto& [unused, stream] : streams_) {
- total_buffered_amount += stream.buffered_amount().value();
- }
+ std::set<StreamID> expected_active_streams;
+ std::set<StreamID> actual_active_streams;
- if (previous_message_has_ended_) {
- auto it = streams_.find(current_stream_id_);
- if (it != streams_.end() && it->second.has_partially_sent_message()) {
- RTC_DLOG(LS_ERROR)
- << "Previous message has ended, but still partial message in stream";
- return false;
+ size_t total_buffered_amount = 0;
+ for (const auto& [stream_id, stream] : streams_) {
+ total_buffered_amount += stream.buffered_amount().value();
+ if (stream.bytes_to_send_in_next_message() > 0) {
+ expected_active_streams.emplace(stream_id);
}
- } else {
- auto it = streams_.find(current_stream_id_);
- if (it == streams_.end() || !it->second.has_partially_sent_message()) {
- RTC_DLOG(LS_ERROR)
- << "Previous message has NOT ended, but there is no partial message";
- return false;
- }
+ }
+ for (const auto& stream : scheduler_.ActiveStreamsForTesting()) {
+ actual_active_streams.emplace(stream->stream_id());
+ }
+ if (expected_active_streams != actual_active_streams) {
+ auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; };
+ RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=["
+ << StrJoin(actual_active_streams, ",", fn)
+ << "], expected=["
+ << StrJoin(expected_active_streams, ",", fn) << "]";
+ return false;
}
return total_buffered_amount == total_buffered_amount_.value();
@@ -118,10 +121,15 @@
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
TimeMs expires_at,
const SendOptions& send_options) {
+ bool was_active = bytes_to_send_in_next_message() > 0;
buffered_amount_.Increase(message.payload().size());
total_buffered_amount_.Increase(message.payload().size());
items_.emplace_back(std::move(message), expires_at, send_options);
+ if (!was_active) {
+ scheduler_stream_->MaybeMakeActive();
+ }
+
RTC_DCHECK(IsConsistent());
}
@@ -227,8 +235,15 @@
total_buffered_amount_.Decrease(item.remaining_size);
items_.pop_front();
+ // Only partially sent messages are discarded, so if a message was
+ // discarded, then it was the currently sent message.
+ scheduler_stream_->ForceReschedule();
+
if (pause_state_ == PauseState::kPending) {
pause_state_ = PauseState::kPaused;
+ scheduler_stream_->MakeInactive();
+ } else if (bytes_to_send_in_next_message() == 0) {
+ scheduler_stream_->MakeInactive();
}
// As the item still existed, it had unsent data.
@@ -277,6 +292,7 @@
if (had_pending_items && pause_state_ == PauseState::kPaused) {
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
<< " was previously active, but is now paused.";
+ scheduler_stream_->MakeInactive();
}
RTC_DCHECK(IsConsistent());
@@ -284,11 +300,8 @@
void RRSendQueue::OutgoingStream::Resume() {
RTC_DCHECK(pause_state_ == PauseState::kResetting);
- if (!items_.empty()) {
- RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
- << " was previously paused, but is now active.";
- }
pause_state_ = PauseState::kNotPaused;
+ scheduler_stream_->MaybeMakeActive();
RTC_DCHECK(IsConsistent());
}
@@ -296,6 +309,11 @@
// This can be called both when an outgoing stream reset has been responded
// to, or when the entire SendQueue is reset due to detecting the peer having
// restarted. The stream may be in any state at this time.
+ PauseState old_pause_state = pause_state_;
+ pause_state_ = PauseState::kNotPaused;
+ next_ordered_mid_ = MID(0);
+ next_unordered_mid_ = MID(0);
+ next_ssn_ = SSN(0);
if (!items_.empty()) {
// If this message has been partially sent, reset it so that it will be
// re-sent.
@@ -309,11 +327,11 @@
item.message_id = absl::nullopt;
item.ssn = absl::nullopt;
item.current_fsn = FSN(0);
+ if (old_pause_state == PauseState::kPaused ||
+ old_pause_state == PauseState::kResetting) {
+ scheduler_stream_->MaybeMakeActive();
+ }
}
- pause_state_ = PauseState::kNotPaused;
- next_ordered_mid_ = MID(0);
- next_unordered_mid_ = MID(0);
- next_ssn_ = SSN(0);
RTC_DCHECK(IsConsistent());
}
@@ -350,67 +368,9 @@
return total_buffered_amount() == 0;
}
-std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
-RRSendQueue::GetNextStream() {
- auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
-
- for (auto it = start_it; it != streams_.end(); ++it) {
- if (it->second.HasDataToSend()) {
- current_stream_id_ = it->first;
- return it;
- }
- }
-
- for (auto it = streams_.begin(); it != start_it; ++it) {
- if (it->second.HasDataToSend()) {
- current_stream_id_ = it->first;
- return it;
- }
- }
- return streams_.end();
-}
-
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
size_t max_size) {
- std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
-
- for (;;) {
- if (previous_message_has_ended_) {
- // Previous message has ended. Round-robin to a different stream, if there
- // even is one with data to send.
- stream_it = GetNextStream();
- if (stream_it == streams_.end()) {
- RTC_DLOG(LS_VERBOSE)
- << log_prefix_
- << "There is no stream with data; Can't produce any data.";
- return absl::nullopt;
- }
- } else {
- // The previous message has not ended; Continue from the current stream.
- stream_it = streams_.find(current_stream_id_);
- RTC_DCHECK(stream_it != streams_.end());
- }
-
- absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
- if (!data.has_value()) {
- continue;
- }
- RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
- << (data->data.is_unordered ? "unordered" : "ordered")
- << "::"
- << (*data->data.is_beginning && *data->data.is_end
- ? "complete"
- : *data->data.is_beginning ? "first"
- : *data->data.is_end ? "last"
- : "middle")
- << ", stream_id=" << *stream_it->first
- << ", ppid=" << *data->data.ppid
- << ", length=" << data->data.payload.size();
-
- previous_message_has_ended_ = *data->data.is_end;
- RTC_DCHECK(IsConsistent());
- return data;
- }
+ return scheduler_.Produce(now, max_size);
}
bool RRSendQueue::Discard(IsUnordered unordered,
@@ -418,12 +378,8 @@
MID message_id) {
bool has_discarded =
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
- if (has_discarded) {
- // Only partially sent messages are discarded, so if a message was
- // discarded, then it was the currently sent message.
- previous_message_has_ended_ = true;
- }
+ RTC_DCHECK(IsConsistent());
return has_discarded;
}
@@ -484,7 +440,7 @@
for (auto& [unused, stream] : streams_) {
stream.Reset();
}
- previous_message_has_ended_ = true;
+ scheduler_.ForceReschedule();
}
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
@@ -516,9 +472,9 @@
}
return streams_
- .emplace(stream_id,
- OutgoingStream(
- stream_id, default_priority_,
+ .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
+ std::forward_as_tuple(
+ &scheduler_, stream_id, default_priority_,
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
total_buffered_amount_))
.first->second;
@@ -562,9 +518,9 @@
state.tx.streams) {
StreamID stream_id(state_stream.id);
streams_.emplace(
- stream_id,
- OutgoingStream(
- stream_id, StreamPriority(state_stream.priority),
+ std::piecewise_construct, std::forward_as_tuple(stream_id),
+ std::forward_as_tuple(
+ &scheduler_, stream_id, StreamPriority(state_stream.priority),
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
total_buffered_amount_, &state_stream));
}
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index 59f0d91..eea814c 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -13,6 +13,7 @@
#include <cstdint>
#include <deque>
#include <map>
+#include <memory>
#include <string>
#include <utility>
#include <vector>
@@ -25,6 +26,7 @@
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
+#include "net/dcsctp/tx/stream_scheduler.h"
namespace dcsctp {
@@ -111,32 +113,33 @@
};
// Per-stream information.
- class OutgoingStream {
+ class OutgoingStream : public StreamScheduler::StreamProducer {
public:
OutgoingStream(
+ StreamScheduler* scheduler,
StreamID stream_id,
StreamPriority priority,
std::function<void()> on_buffered_amount_low,
ThresholdWatcher& total_buffered_amount,
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
- : stream_id_(stream_id),
- priority_(priority),
+ : scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
next_ssn_(SSN(state ? state->next_ssn : 0)),
buffered_amount_(std::move(on_buffered_amount_low)),
total_buffered_amount_(total_buffered_amount) {}
- StreamID stream_id() const { return stream_id_; }
+ StreamID stream_id() const { return scheduler_stream_->stream_id(); }
// Enqueues a message to this stream.
void Add(DcSctpMessage message,
TimeMs expires_at,
const SendOptions& send_options);
- // Produces a data chunk to send, or `absl::nullopt` if nothing could be
- // produced, e.g. if all messages have expired.
- absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
+ // Implementing `StreamScheduler::StreamProducer`.
+ absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
+ size_t max_size) override;
+ size_t bytes_to_send_in_next_message() const override;
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
@@ -167,12 +170,10 @@
// Indicates if this stream has a partially sent message in it.
bool has_partially_sent_message() const;
- // Indicates if the stream possibly has data to send. Note that it may
- // return `true` for streams that have enqueued, but expired, messages.
- bool HasDataToSend() const;
-
- void set_priority(StreamPriority priority) { priority_ = priority; }
- StreamPriority priority() const { return priority_; }
+ StreamPriority priority() const { return scheduler_stream_->priority(); }
+ void set_priority(StreamPriority priority) {
+ scheduler_stream_->set_priority(priority);
+ }
void AddHandoverState(
DcSctpSocketHandoverState::OutgoingStream& state) const;
@@ -225,8 +226,8 @@
bool IsConsistent() const;
- const StreamID stream_id_;
- StreamPriority priority_;
+ const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
+
PauseState pause_state_ = PauseState::kNotPaused;
// MIDs are different for unordered and ordered messages sent on a stream.
MID next_unordered_mid_;
@@ -251,12 +252,10 @@
TimeMs now,
size_t max_size);
- // Return the next stream, in round-robin fashion.
- std::map<StreamID, OutgoingStream>::iterator GetNextStream();
-
const std::string log_prefix_;
const size_t buffer_size_;
const StreamPriority default_priority_;
+ StreamScheduler scheduler_;
// Called when the buffered amount is below what has been set using
// `SetBufferedAmountLowThreshold`.
@@ -269,15 +268,6 @@
// The total amount of buffer data, for all streams.
ThresholdWatcher total_buffered_amount_;
- // Indicates if the previous fragment sent was the end of a message. For
- // non-interleaved sending, this means that the next message may come from a
- // different stream. If not true, the next fragment must be produced from the
- // same stream as last time.
- bool previous_message_has_ended_ = true;
-
- // The current stream to send chunks from. Modified by `GetNextStream`.
- StreamID current_stream_id_ = StreamID(0);
-
// All streams, and messages added to those.
std::map<StreamID, OutgoingStream> streams_;
};
diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc
index 22457bf..2056dd1 100644
--- a/net/dcsctp/tx/stream_scheduler.cc
+++ b/net/dcsctp/tx/stream_scheduler.cc
@@ -114,7 +114,7 @@
absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
TimeMs now,
size_t max_size) {
- absl::optional<SendQueue::DataToSend> data = callback_.Produce(now, max_size);
+ absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
if (data.has_value()) {
VirtualTime new_current = GetNextFinishTime();
diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h
index 9feeb61..d82501f 100644
--- a/net/dcsctp/tx/stream_scheduler.h
+++ b/net/dcsctp/tx/stream_scheduler.h
@@ -56,9 +56,9 @@
};
public:
- class StreamCallback {
+ class StreamProducer {
public:
- virtual ~StreamCallback() = default;
+ virtual ~StreamProducer() = default;
// Produces a fragment of data to send. The current wall time is specified
// as `now` and should be used to skip chunks with expired limited lifetime.
@@ -99,11 +99,11 @@
friend class StreamScheduler;
Stream(StreamScheduler* parent,
- StreamCallback* callback,
+ StreamProducer* producer,
StreamID stream_id,
StreamPriority priority)
: parent_(*parent),
- callback_(*callback),
+ producer_(*producer),
stream_id_(stream_id),
priority_(priority) {}
@@ -117,14 +117,14 @@
VirtualTime current_time() const { return current_virtual_time_; }
VirtualTime next_finish_time() const { return next_finish_time_; }
size_t bytes_to_send_in_next_message() const {
- return callback_.bytes_to_send_in_next_message();
+ return producer_.bytes_to_send_in_next_message();
}
// Returns the next virtual finish time for this stream.
VirtualTime GetNextFinishTime() const;
StreamScheduler& parent_;
- StreamCallback& callback_;
+ StreamProducer& producer_;
const StreamID stream_id_;
StreamPriority priority_;
// This outgoing stream's "current" virtual_time.
@@ -132,10 +132,10 @@
VirtualTime next_finish_time_ = VirtualTime::Zero();
};
- std::unique_ptr<Stream> CreateStream(StreamCallback* callback,
+ std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
StreamID stream_id,
StreamPriority priority) {
- return absl::WrapUnique(new Stream(this, callback, stream_id, priority));
+ return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
}
// Makes the scheduler stop producing message from the current stream and
diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc
index cd15837..0c239fe 100644
--- a/net/dcsctp/tx/stream_scheduler_test.cc
+++ b/net/dcsctp/tx/stream_scheduler_test.cc
@@ -59,7 +59,7 @@
return packet_counts;
}
-class MockStreamCallback : public StreamScheduler::StreamCallback {
+class MockStreamProducer : public StreamScheduler::StreamProducer {
public:
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
Produce,
@@ -74,18 +74,18 @@
StreamID stream_id,
StreamPriority priority,
size_t packet_size = kPayloadSize) {
- EXPECT_CALL(callback_, Produce)
+ EXPECT_CALL(producer_, Produce)
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
- EXPECT_CALL(callback_, bytes_to_send_in_next_message)
+ EXPECT_CALL(producer_, bytes_to_send_in_next_message)
.WillRepeatedly(Return(packet_size));
- stream_ = scheduler.CreateStream(&callback_, stream_id, priority);
+ stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
stream_->MaybeMakeActive();
}
StreamScheduler::Stream& stream() { return *stream_; }
private:
- StrictMock<MockStreamCallback> callback_;
+ StrictMock<MockStreamProducer> producer_;
std::unique_ptr<StreamScheduler::Stream> stream_;
};
@@ -100,9 +100,9 @@
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
StreamScheduler scheduler;
- StrictMock<MockStreamCallback> callback;
+ StrictMock<MockStreamProducer> producer;
auto stream =
- scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
EXPECT_EQ(stream->stream_id(), StreamID(1));
EXPECT_EQ(stream->priority(), StreamPriority(2));
@@ -115,13 +115,13 @@
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
StreamScheduler scheduler;
- StrictMock<MockStreamCallback> callback;
- EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
- EXPECT_CALL(callback, bytes_to_send_in_next_message)
+ StrictMock<MockStreamProducer> producer;
+ EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
+ EXPECT_CALL(producer, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(0));
auto stream =
- scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
stream->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
@@ -132,32 +132,32 @@
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
StreamScheduler scheduler;
- StrictMock<MockStreamCallback> callback1;
- EXPECT_CALL(callback1, Produce)
+ StrictMock<MockStreamProducer> producer1;
+ EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
- EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
- scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
- StrictMock<MockStreamCallback> callback2;
- EXPECT_CALL(callback2, Produce)
+ StrictMock<MockStreamProducer> producer2;
+ EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
- EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+ EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
- scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
+ scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -174,8 +174,8 @@
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
StreamScheduler scheduler;
- StrictMock<MockStreamCallback> callback1;
- EXPECT_CALL(callback1, Produce)
+ StrictMock<MockStreamProducer> producer1;
+ EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce([](...) {
return SendQueue::DataToSend(
@@ -196,7 +196,7 @@
Data::IsEnd(true), IsUnordered(true)));
})
.WillOnce(CreateChunk(StreamID(1), MID(102)));
- EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
@@ -204,21 +204,21 @@
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
- scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
- StrictMock<MockStreamCallback> callback2;
- EXPECT_CALL(callback2, Produce)
+ StrictMock<MockStreamProducer> producer2;
+ EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
- EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+ EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
- scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
+ scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -236,16 +236,16 @@
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
StreamScheduler scheduler;
- StrictMock<MockStreamCallback> callback1;
- EXPECT_CALL(callback1, Produce)
+ StrictMock<MockStreamProducer> producer1;
+ EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)));
- EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
auto stream1 =
- scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -260,20 +260,20 @@
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
StreamScheduler scheduler;
- StrictMock<MockStreamCallback> callback1;
+ StrictMock<MockStreamProducer> producer1;
// Callbacks are setup so that they hint that there is a MID(2) coming...
- EXPECT_CALL(callback1, Produce)
+ EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
- EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active again
.WillOnce(Return(0));
auto stream1 =
- scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -290,33 +290,33 @@
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
StreamScheduler scheduler;
- StrictMock<MockStreamCallback> callback1;
- EXPECT_CALL(callback1, Produce)
+ StrictMock<MockStreamProducer> producer1;
+ EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
- EXPECT_CALL(callback1, bytes_to_send_in_next_message)
+ EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
- scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
- StrictMock<MockStreamCallback> callback2;
- EXPECT_CALL(callback2, Produce)
+ StrictMock<MockStreamProducer> producer2;
+ EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
- EXPECT_CALL(callback2, bytes_to_send_in_next_message)
+ EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
- scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
+ scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));