Revert "dcsctp: Use stream scheduler in send queue"
This reverts commit d729d12454906d924d5a142deb3432e2d5fa97ae.
Reason for revert: Breaks downstream project.
Original change's description:
> dcsctp: Use stream scheduler in send queue
>
> Changing the currently embedded scheduler that was implemented using a
> revolving pointer, to the parameterized stream scheduler that is
> implemented using a "virtual finish time" approach.
>
> Also renamed StreamCallback to StreamProducer, per review comments.
>
> Bug: webrtc:5696
> Change-Id: I7719678776ddbe05b688ada1b52887e5ca2fb206
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/262160
> Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> Commit-Queue: Victor Boivie <boivie@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#37170}
Bug: webrtc:5696
Change-Id: Iaf3608b52a31eb31b4ca604539edb2e8ca89399b
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265389
Auto-Submit: Victor Boivie <boivie@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Cr-Commit-Position: refs/heads/main@{#37172}
diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn
index ae81db8..c2fb12e 100644
--- a/net/dcsctp/tx/BUILD.gn
+++ b/net/dcsctp/tx/BUILD.gn
@@ -14,7 +14,6 @@
"../common:internal_types",
"../packet:chunk",
"../packet:data",
- "../public:socket",
"../public:types",
]
sources = [ "send_queue.h" ]
@@ -24,12 +23,9 @@
rtc_library("rr_send_queue") {
deps = [
":send_queue",
- ":stream_scheduler",
"../../../api:array_view",
"../../../rtc_base:checks",
"../../../rtc_base:logging",
- "../../../rtc_base/containers:flat_map",
- "../common:str_join",
"../packet:data",
"../public:socket",
"../public:types",
@@ -184,7 +180,6 @@
"../common:sequence_numbers",
"../packet:chunk",
"../packet:data",
- "../packet:sctp_packet",
"../public:socket",
"../public:types",
"../testing:data_generator",
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 09323d2..6127da4 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -13,14 +13,12 @@
#include <deque>
#include <limits>
#include <map>
-#include <set>
#include <utility>
#include <vector>
#include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
-#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
@@ -44,18 +42,18 @@
total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
}
-size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const {
+bool RRSendQueue::OutgoingStream::HasDataToSend() const {
if (pause_state_ == PauseState::kPaused ||
pause_state_ == PauseState::kResetting) {
// The stream has paused (and there is no partially sent message).
- return 0;
+ return false;
}
if (items_.empty()) {
- return 0;
+ return false;
}
- return items_.front().remaining_size;
+ return true;
}
void RRSendQueue::OutgoingStream::AddHandoverState(
@@ -63,30 +61,29 @@
state.next_ssn = next_ssn_.value();
state.next_ordered_mid = next_ordered_mid_.value();
state.next_unordered_mid = next_unordered_mid_.value();
- state.priority = *scheduler_stream_->priority();
+ state.priority = *priority_;
}
bool RRSendQueue::IsConsistent() const {
- std::set<StreamID> expected_active_streams;
- std::set<StreamID> actual_active_streams;
-
size_t total_buffered_amount = 0;
- for (const auto& [stream_id, stream] : streams_) {
+ for (const auto& [unused, stream] : streams_) {
total_buffered_amount += stream.buffered_amount().value();
- if (stream.bytes_to_send_in_next_message() > 0) {
- expected_active_streams.emplace(stream_id);
+ }
+
+ if (previous_message_has_ended_) {
+ auto it = streams_.find(current_stream_id_);
+ if (it != streams_.end() && it->second.has_partially_sent_message()) {
+ RTC_DLOG(LS_ERROR)
+ << "Previous message has ended, but still partial message in stream";
+ return false;
}
- }
- for (const auto& stream : scheduler_.ActiveStreamsForTesting()) {
- actual_active_streams.emplace(stream->stream_id());
- }
- if (expected_active_streams != actual_active_streams) {
- auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; };
- RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=["
- << StrJoin(actual_active_streams, ",", fn)
- << "], expected=["
- << StrJoin(expected_active_streams, ",", fn) << "]";
- return false;
+ } else {
+ auto it = streams_.find(current_stream_id_);
+ if (it == streams_.end() || !it->second.has_partially_sent_message()) {
+ RTC_DLOG(LS_ERROR)
+ << "Previous message has NOT ended, but there is no partial message";
+ return false;
+ }
}
return total_buffered_amount == total_buffered_amount_.value();
@@ -121,15 +118,10 @@
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
TimeMs expires_at,
const SendOptions& send_options) {
- bool was_active = bytes_to_send_in_next_message() > 0;
buffered_amount_.Increase(message.payload().size());
total_buffered_amount_.Increase(message.payload().size());
items_.emplace_back(std::move(message), expires_at, send_options);
- if (!was_active) {
- scheduler_stream_->MaybeMakeActive();
- }
-
RTC_DCHECK(IsConsistent());
}
@@ -235,15 +227,8 @@
total_buffered_amount_.Decrease(item.remaining_size);
items_.pop_front();
- // Only partially sent messages are discarded, so if a message was
- // discarded, then it was the currently sent message.
- scheduler_stream_->ForceReschedule();
-
if (pause_state_ == PauseState::kPending) {
pause_state_ = PauseState::kPaused;
- scheduler_stream_->MakeInactive();
- } else if (bytes_to_send_in_next_message() == 0) {
- scheduler_stream_->MakeInactive();
}
// As the item still existed, it had unsent data.
@@ -292,7 +277,6 @@
if (had_pending_items && pause_state_ == PauseState::kPaused) {
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
<< " was previously active, but is now paused.";
- scheduler_stream_->MakeInactive();
}
RTC_DCHECK(IsConsistent());
@@ -300,8 +284,11 @@
void RRSendQueue::OutgoingStream::Resume() {
RTC_DCHECK(pause_state_ == PauseState::kResetting);
+ if (!items_.empty()) {
+ RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
+ << " was previously paused, but is now active.";
+ }
pause_state_ = PauseState::kNotPaused;
- scheduler_stream_->MaybeMakeActive();
RTC_DCHECK(IsConsistent());
}
@@ -309,11 +296,6 @@
// This can be called both when an outgoing stream reset has been responded
// to, or when the entire SendQueue is reset due to detecting the peer having
// restarted. The stream may be in any state at this time.
- PauseState old_pause_state = pause_state_;
- pause_state_ = PauseState::kNotPaused;
- next_ordered_mid_ = MID(0);
- next_unordered_mid_ = MID(0);
- next_ssn_ = SSN(0);
if (!items_.empty()) {
// If this message has been partially sent, reset it so that it will be
// re-sent.
@@ -327,11 +309,11 @@
item.message_id = absl::nullopt;
item.ssn = absl::nullopt;
item.current_fsn = FSN(0);
- if (old_pause_state == PauseState::kPaused ||
- old_pause_state == PauseState::kResetting) {
- scheduler_stream_->MaybeMakeActive();
- }
}
+ pause_state_ = PauseState::kNotPaused;
+ next_ordered_mid_ = MID(0);
+ next_unordered_mid_ = MID(0);
+ next_ssn_ = SSN(0);
RTC_DCHECK(IsConsistent());
}
@@ -368,9 +350,67 @@
return total_buffered_amount() == 0;
}
+std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
+RRSendQueue::GetNextStream() {
+ auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
+
+ for (auto it = start_it; it != streams_.end(); ++it) {
+ if (it->second.HasDataToSend()) {
+ current_stream_id_ = it->first;
+ return it;
+ }
+ }
+
+ for (auto it = streams_.begin(); it != start_it; ++it) {
+ if (it->second.HasDataToSend()) {
+ current_stream_id_ = it->first;
+ return it;
+ }
+ }
+ return streams_.end();
+}
+
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
size_t max_size) {
- return scheduler_.Produce(now, max_size);
+ std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
+
+ for (;;) {
+ if (previous_message_has_ended_) {
+ // Previous message has ended. Round-robin to a different stream, if there
+ // even is one with data to send.
+ stream_it = GetNextStream();
+ if (stream_it == streams_.end()) {
+ RTC_DLOG(LS_VERBOSE)
+ << log_prefix_
+ << "There is no stream with data; Can't produce any data.";
+ return absl::nullopt;
+ }
+ } else {
+ // The previous message has not ended; Continue from the current stream.
+ stream_it = streams_.find(current_stream_id_);
+ RTC_DCHECK(stream_it != streams_.end());
+ }
+
+ absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
+ if (!data.has_value()) {
+ continue;
+ }
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
+ << (data->data.is_unordered ? "unordered" : "ordered")
+ << "::"
+ << (*data->data.is_beginning && *data->data.is_end
+ ? "complete"
+ : *data->data.is_beginning ? "first"
+ : *data->data.is_end ? "last"
+ : "middle")
+ << ", stream_id=" << *stream_it->first
+ << ", ppid=" << *data->data.ppid
+ << ", length=" << data->data.payload.size();
+
+ previous_message_has_ended_ = *data->data.is_end;
+ RTC_DCHECK(IsConsistent());
+ return data;
+ }
}
bool RRSendQueue::Discard(IsUnordered unordered,
@@ -378,8 +418,12 @@
MID message_id) {
bool has_discarded =
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
+ if (has_discarded) {
+ // Only partially sent messages are discarded, so if a message was
+ // discarded, then it was the currently sent message.
+ previous_message_has_ended_ = true;
+ }
- RTC_DCHECK(IsConsistent());
return has_discarded;
}
@@ -440,7 +484,7 @@
for (auto& [unused, stream] : streams_) {
stream.Reset();
}
- scheduler_.ForceReschedule();
+ previous_message_has_ended_ = true;
}
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
@@ -472,9 +516,9 @@
}
return streams_
- .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
- std::forward_as_tuple(
- &scheduler_, stream_id, default_priority_,
+ .emplace(stream_id,
+ OutgoingStream(
+ stream_id, default_priority_,
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
total_buffered_amount_))
.first->second;
@@ -518,9 +562,9 @@
state.tx.streams) {
StreamID stream_id(state_stream.id);
streams_.emplace(
- std::piecewise_construct, std::forward_as_tuple(stream_id),
- std::forward_as_tuple(
- &scheduler_, stream_id, StreamPriority(state_stream.priority),
+ stream_id,
+ OutgoingStream(
+ stream_id, StreamPriority(state_stream.priority),
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
total_buffered_amount_, &state_stream));
}
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index eea814c..59f0d91 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -13,7 +13,6 @@
#include <cstdint>
#include <deque>
#include <map>
-#include <memory>
#include <string>
#include <utility>
#include <vector>
@@ -26,7 +25,6 @@
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
-#include "net/dcsctp/tx/stream_scheduler.h"
namespace dcsctp {
@@ -113,33 +111,32 @@
};
// Per-stream information.
- class OutgoingStream : public StreamScheduler::StreamProducer {
+ class OutgoingStream {
public:
OutgoingStream(
- StreamScheduler* scheduler,
StreamID stream_id,
StreamPriority priority,
std::function<void()> on_buffered_amount_low,
ThresholdWatcher& total_buffered_amount,
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
- : scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
+ : stream_id_(stream_id),
+ priority_(priority),
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
next_ssn_(SSN(state ? state->next_ssn : 0)),
buffered_amount_(std::move(on_buffered_amount_low)),
total_buffered_amount_(total_buffered_amount) {}
- StreamID stream_id() const { return scheduler_stream_->stream_id(); }
+ StreamID stream_id() const { return stream_id_; }
// Enqueues a message to this stream.
void Add(DcSctpMessage message,
TimeMs expires_at,
const SendOptions& send_options);
- // Implementing `StreamScheduler::StreamProducer`.
- absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
- size_t max_size) override;
- size_t bytes_to_send_in_next_message() const override;
+ // Produces a data chunk to send, or `absl::nullopt` if nothing could be
+ // produced, e.g. if all messages have expired.
+ absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
@@ -170,10 +167,12 @@
// Indicates if this stream has a partially sent message in it.
bool has_partially_sent_message() const;
- StreamPriority priority() const { return scheduler_stream_->priority(); }
- void set_priority(StreamPriority priority) {
- scheduler_stream_->set_priority(priority);
- }
+ // Indicates if the stream possibly has data to send. Note that it may
+ // return `true` for streams that have enqueued, but expired, messages.
+ bool HasDataToSend() const;
+
+ void set_priority(StreamPriority priority) { priority_ = priority; }
+ StreamPriority priority() const { return priority_; }
void AddHandoverState(
DcSctpSocketHandoverState::OutgoingStream& state) const;
@@ -226,8 +225,8 @@
bool IsConsistent() const;
- const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
-
+ const StreamID stream_id_;
+ StreamPriority priority_;
PauseState pause_state_ = PauseState::kNotPaused;
// MIDs are different for unordered and ordered messages sent on a stream.
MID next_unordered_mid_;
@@ -252,10 +251,12 @@
TimeMs now,
size_t max_size);
+ // Return the next stream, in round-robin fashion.
+ std::map<StreamID, OutgoingStream>::iterator GetNextStream();
+
const std::string log_prefix_;
const size_t buffer_size_;
const StreamPriority default_priority_;
- StreamScheduler scheduler_;
// Called when the buffered amount is below what has been set using
// `SetBufferedAmountLowThreshold`.
@@ -268,6 +269,15 @@
// The total amount of buffer data, for all streams.
ThresholdWatcher total_buffered_amount_;
+ // Indicates if the previous fragment sent was the end of a message. For
+ // non-interleaved sending, this means that the next message may come from a
+ // different stream. If not true, the next fragment must be produced from the
+ // same stream as last time.
+ bool previous_message_has_ended_ = true;
+
+ // The current stream to send chunks from. Modified by `GetNextStream`.
+ StreamID current_stream_id_ = StreamID(0);
+
// All streams, and messages added to those.
std::map<StreamID, OutgoingStream> streams_;
};
diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc
index 2056dd1..22457bf 100644
--- a/net/dcsctp/tx/stream_scheduler.cc
+++ b/net/dcsctp/tx/stream_scheduler.cc
@@ -114,7 +114,7 @@
absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
TimeMs now,
size_t max_size) {
- absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
+ absl::optional<SendQueue::DataToSend> data = callback_.Produce(now, max_size);
if (data.has_value()) {
VirtualTime new_current = GetNextFinishTime();
diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h
index d82501f..9feeb61 100644
--- a/net/dcsctp/tx/stream_scheduler.h
+++ b/net/dcsctp/tx/stream_scheduler.h
@@ -56,9 +56,9 @@
};
public:
- class StreamProducer {
+ class StreamCallback {
public:
- virtual ~StreamProducer() = default;
+ virtual ~StreamCallback() = default;
// Produces a fragment of data to send. The current wall time is specified
// as `now` and should be used to skip chunks with expired limited lifetime.
@@ -99,11 +99,11 @@
friend class StreamScheduler;
Stream(StreamScheduler* parent,
- StreamProducer* producer,
+ StreamCallback* callback,
StreamID stream_id,
StreamPriority priority)
: parent_(*parent),
- producer_(*producer),
+ callback_(*callback),
stream_id_(stream_id),
priority_(priority) {}
@@ -117,14 +117,14 @@
VirtualTime current_time() const { return current_virtual_time_; }
VirtualTime next_finish_time() const { return next_finish_time_; }
size_t bytes_to_send_in_next_message() const {
- return producer_.bytes_to_send_in_next_message();
+ return callback_.bytes_to_send_in_next_message();
}
// Returns the next virtual finish time for this stream.
VirtualTime GetNextFinishTime() const;
StreamScheduler& parent_;
- StreamProducer& producer_;
+ StreamCallback& callback_;
const StreamID stream_id_;
StreamPriority priority_;
// This outgoing stream's "current" virtual_time.
@@ -132,10 +132,10 @@
VirtualTime next_finish_time_ = VirtualTime::Zero();
};
- std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
+ std::unique_ptr<Stream> CreateStream(StreamCallback* callback,
StreamID stream_id,
StreamPriority priority) {
- return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
+ return absl::WrapUnique(new Stream(this, callback, stream_id, priority));
}
// Makes the scheduler stop producing message from the current stream and
diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc
index 0c239fe..cd15837 100644
--- a/net/dcsctp/tx/stream_scheduler_test.cc
+++ b/net/dcsctp/tx/stream_scheduler_test.cc
@@ -59,7 +59,7 @@
return packet_counts;
}
-class MockStreamProducer : public StreamScheduler::StreamProducer {
+class MockStreamCallback : public StreamScheduler::StreamCallback {
public:
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
Produce,
@@ -74,18 +74,18 @@
StreamID stream_id,
StreamPriority priority,
size_t packet_size = kPayloadSize) {
- EXPECT_CALL(producer_, Produce)
+ EXPECT_CALL(callback_, Produce)
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
- EXPECT_CALL(producer_, bytes_to_send_in_next_message)
+ EXPECT_CALL(callback_, bytes_to_send_in_next_message)
.WillRepeatedly(Return(packet_size));
- stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
+ stream_ = scheduler.CreateStream(&callback_, stream_id, priority);
stream_->MaybeMakeActive();
}
StreamScheduler::Stream& stream() { return *stream_; }
private:
- StrictMock<MockStreamProducer> producer_;
+ StrictMock<MockStreamCallback> callback_;
std::unique_ptr<StreamScheduler::Stream> stream_;
};
@@ -100,9 +100,9 @@
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
StreamScheduler scheduler;
- StrictMock<MockStreamProducer> producer;
+ StrictMock<MockStreamCallback> callback;
auto stream =
- scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
EXPECT_EQ(stream->stream_id(), StreamID(1));
EXPECT_EQ(stream->priority(), StreamPriority(2));
@@ -115,13 +115,13 @@
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
StreamScheduler scheduler;
- StrictMock<MockStreamProducer> producer;
- EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
- EXPECT_CALL(producer, bytes_to_send_in_next_message)
+ StrictMock<MockStreamCallback> callback;
+ EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
+ EXPECT_CALL(callback, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(0));
auto stream =
- scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
stream->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
@@ -132,32 +132,32 @@
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
StreamScheduler scheduler;
- StrictMock<MockStreamProducer> producer1;
- EXPECT_CALL(producer1, Produce)
+ StrictMock<MockStreamCallback> callback1;
+ EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
- EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
- scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
- StrictMock<MockStreamProducer> producer2;
- EXPECT_CALL(producer2, Produce)
+ StrictMock<MockStreamCallback> callback2;
+ EXPECT_CALL(callback2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
- EXPECT_CALL(producer2, bytes_to_send_in_next_message)
+ EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
- scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
+ scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -174,8 +174,8 @@
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
StreamScheduler scheduler;
- StrictMock<MockStreamProducer> producer1;
- EXPECT_CALL(producer1, Produce)
+ StrictMock<MockStreamCallback> callback1;
+ EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce([](...) {
return SendQueue::DataToSend(
@@ -196,7 +196,7 @@
Data::IsEnd(true), IsUnordered(true)));
})
.WillOnce(CreateChunk(StreamID(1), MID(102)));
- EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
@@ -204,21 +204,21 @@
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
- scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
- StrictMock<MockStreamProducer> producer2;
- EXPECT_CALL(producer2, Produce)
+ StrictMock<MockStreamCallback> callback2;
+ EXPECT_CALL(callback2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
- EXPECT_CALL(producer2, bytes_to_send_in_next_message)
+ EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
- scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
+ scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -236,16 +236,16 @@
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
StreamScheduler scheduler;
- StrictMock<MockStreamProducer> producer1;
- EXPECT_CALL(producer1, Produce)
+ StrictMock<MockStreamCallback> callback1;
+ EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)));
- EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
auto stream1 =
- scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -260,20 +260,20 @@
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
StreamScheduler scheduler;
- StrictMock<MockStreamProducer> producer1;
+ StrictMock<MockStreamCallback> callback1;
// Callbacks are setup so that they hint that there is a MID(2) coming...
- EXPECT_CALL(producer1, Produce)
+ EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
- EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active again
.WillOnce(Return(0));
auto stream1 =
- scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
@@ -290,33 +290,33 @@
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
StreamScheduler scheduler;
- StrictMock<MockStreamProducer> producer1;
- EXPECT_CALL(producer1, Produce)
+ StrictMock<MockStreamCallback> callback1;
+ EXPECT_CALL(callback1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
- EXPECT_CALL(producer1, bytes_to_send_in_next_message)
+ EXPECT_CALL(callback1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
- scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
+ scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
- StrictMock<MockStreamProducer> producer2;
- EXPECT_CALL(producer2, Produce)
+ StrictMock<MockStreamCallback> callback2;
+ EXPECT_CALL(callback2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
- EXPECT_CALL(producer2, bytes_to_send_in_next_message)
+ EXPECT_CALL(callback2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
- scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
+ scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));