blob: 0c239fe8b0be6f598319ecfdf3a413287235230a [file] [log] [blame]
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/tx/stream_scheduler.h"
#include <vector>
#include "net/dcsctp/public/types.h"
#include "test/gmock.h"
namespace dcsctp {
namespace {
using ::testing::Return;
using ::testing::StrictMock;
constexpr size_t kMtu = 1000;
constexpr size_t kPayloadSize = 4;
MATCHER_P(HasDataWithMid, mid, "") {
if (!arg.has_value()) {
*result_listener << "There was no produced data";
return false;
}
if (arg->data.message_id != mid) {
*result_listener << "the produced data had mid " << *arg->data.message_id
<< " and not the expected " << *mid;
return false;
}
return true;
}
std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)>
CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) {
return [sid, mid, payload_size](TimeMs now, size_t max_size) {
return SendQueue::DataToSend(Data(
sid, SSN(0), mid, FSN(0), PPID(42), std::vector<uint8_t>(payload_size),
Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true)));
};
}
std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
size_t packets_to_generate) {
std::map<StreamID, size_t> packet_counts;
for (size_t i = 0; i < packets_to_generate; ++i) {
absl::optional<SendQueue::DataToSend> data =
scheduler.Produce(TimeMs(0), kMtu);
if (data.has_value()) {
++packet_counts[data->data.stream_id];
}
}
return packet_counts;
}
class MockStreamProducer : public StreamScheduler::StreamProducer {
public:
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
Produce,
(TimeMs, size_t),
(override));
MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override));
};
class TestStream {
public:
TestStream(StreamScheduler& scheduler,
StreamID stream_id,
StreamPriority priority,
size_t packet_size = kPayloadSize) {
EXPECT_CALL(producer_, Produce)
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
EXPECT_CALL(producer_, bytes_to_send_in_next_message)
.WillRepeatedly(Return(packet_size));
stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
stream_->MaybeMakeActive();
}
StreamScheduler::Stream& stream() { return *stream_; }
private:
StrictMock<MockStreamProducer> producer_;
std::unique_ptr<StreamScheduler::Stream> stream_;
};
// A scheduler without active streams doesn't produce data.
TEST(StreamSchedulerTest, HasNoActiveStreams) {
StreamScheduler scheduler;
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Stream properties can be set and retrieved
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
StreamScheduler scheduler;
StrictMock<MockStreamProducer> producer;
auto stream =
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
EXPECT_EQ(stream->stream_id(), StreamID(1));
EXPECT_EQ(stream->priority(), StreamPriority(2));
stream->set_priority(StreamPriority(0));
EXPECT_EQ(stream->priority(), StreamPriority(0));
}
// A scheduler with a single stream produced packets from it.
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
StreamScheduler scheduler;
StrictMock<MockStreamProducer> producer;
EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
EXPECT_CALL(producer, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(0));
auto stream =
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
stream->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Switches between two streams after every packet.
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
StreamScheduler scheduler;
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Switches between two streams after every packet, but keeps producing from the
// same stream when a packet contains of multiple fragments.
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
StreamScheduler scheduler;
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce([](...) {
return SendQueue::DataToSend(
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(true),
Data::IsEnd(false), IsUnordered(true)));
})
.WillOnce([](...) {
return SendQueue::DataToSend(
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(false),
Data::IsEnd(false), IsUnordered(true)));
})
.WillOnce([](...) {
return SendQueue::DataToSend(
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
std::vector<uint8_t>(4), Data::IsBeginning(false),
Data::IsEnd(true), IsUnordered(true)));
})
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Deactivates a stream before it has finished producing all packets.
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
StreamScheduler scheduler;
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
// ... but the stream is made inactive before it can be produced.
stream1->MakeInactive();
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Resumes a paused stream - makes a stream active after inactivating it.
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
StreamScheduler scheduler;
StrictMock<MockStreamProducer> producer1;
// Callbacks are setup so that they hint that there is a MID(2) coming...
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active again
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
stream1->MakeInactive();
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Iterates between streams, where one is suddenly paused and later resumed.
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
StreamScheduler scheduler;
StrictMock<MockStreamProducer> producer1;
EXPECT_CALL(producer1, Produce)
.WillOnce(CreateChunk(StreamID(1), MID(100)))
.WillOnce(CreateChunk(StreamID(1), MID(101)))
.WillOnce(CreateChunk(StreamID(1), MID(102)));
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream1 =
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
stream1->MaybeMakeActive();
StrictMock<MockStreamProducer> producer2;
EXPECT_CALL(producer2, Produce)
.WillOnce(CreateChunk(StreamID(2), MID(200)))
.WillOnce(CreateChunk(StreamID(2), MID(201)))
.WillOnce(CreateChunk(StreamID(2), MID(202)));
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
.WillOnce(Return(kPayloadSize)) // When making active
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(kPayloadSize))
.WillOnce(Return(0));
auto stream2 =
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
stream2->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
stream1->MakeInactive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
stream1->MaybeMakeActive();
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
}
// Verifies that packet counts are evenly distributed in round robin scheduling.
TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) {
StreamScheduler scheduler;
TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
EXPECT_EQ(packet_counts[StreamID(2)], 5U);
}
// Verifies that packet counts are evenly distributed among active streams,
// where a stream is suddenly made inactive, two are added, and then the paused
// stream is resumed.
TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) {
StreamScheduler scheduler;
TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
EXPECT_EQ(packet_counts[StreamID(2)], 5U);
stream2.stream().MakeInactive();
TestStream stream3(scheduler, StreamID(3), StreamPriority(1));
TestStream stream4(scheduler, StreamID(4), StreamPriority(1));
std::map<StreamID, size_t> counts2 = GetPacketCounts(scheduler, 15);
EXPECT_EQ(counts2[StreamID(1)], 5U);
EXPECT_EQ(counts2[StreamID(2)], 0U);
EXPECT_EQ(counts2[StreamID(3)], 5U);
EXPECT_EQ(counts2[StreamID(4)], 5U);
stream2.stream().MaybeMakeActive();
std::map<StreamID, size_t> counts3 = GetPacketCounts(scheduler, 20);
EXPECT_EQ(counts3[StreamID(1)], 5U);
EXPECT_EQ(counts3[StreamID(2)], 5U);
EXPECT_EQ(counts3[StreamID(3)], 5U);
EXPECT_EQ(counts3[StreamID(4)], 5U);
}
} // namespace
} // namespace dcsctp