| /* |
| * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. |
| * |
| * Use of this source code is governed by a BSD-style license |
| * that can be found in the LICENSE file in the root of the source |
| * tree. An additional intellectual property rights grant can be found |
| * in the file PATENTS. All contributing project authors may |
| * be found in the AUTHORS file in the root of the source tree. |
| */ |
| #include "net/dcsctp/tx/fcfs_send_queue.h" |
| |
| #include <cstdint> |
| #include <type_traits> |
| #include <vector> |
| |
| #include "net/dcsctp/packet/data.h" |
| #include "net/dcsctp/public/dcsctp_message.h" |
| #include "net/dcsctp/public/dcsctp_options.h" |
| #include "net/dcsctp/public/dcsctp_socket.h" |
| #include "net/dcsctp/public/types.h" |
| #include "net/dcsctp/tx/send_queue.h" |
| #include "rtc_base/gunit.h" |
| #include "test/gmock.h" |
| |
| namespace dcsctp { |
| namespace { |
| |
| constexpr TimeMs kNow = TimeMs(0); |
| constexpr StreamID kStreamID(1); |
| constexpr PPID kPPID(53); |
| |
| class FCFSSendQueueTest : public testing::Test { |
| protected: |
| FCFSSendQueueTest() : buf_("log: ", 100) {} |
| |
| const DcSctpOptions options_; |
| FCFSSendQueue buf_; |
| }; |
| |
| TEST_F(FCFSSendQueueTest, EmptyBuffer) { |
| EXPECT_TRUE(buf_.IsEmpty()); |
| EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); |
| EXPECT_FALSE(buf_.IsFull()); |
| } |
| |
| TEST_F(FCFSSendQueueTest, AddAndGetSingleChunk) { |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); |
| |
| EXPECT_FALSE(buf_.IsEmpty()); |
| EXPECT_FALSE(buf_.IsFull()); |
| absl::optional<SendQueue::DataToSend> chunk_opt = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_opt.has_value()); |
| EXPECT_TRUE(chunk_opt->data.is_beginning); |
| EXPECT_TRUE(chunk_opt->data.is_end); |
| } |
| |
| TEST_F(FCFSSendQueueTest, CarveOutBeginningMiddleAndEnd) { |
| std::vector<uint8_t> payload(60); |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| |
| absl::optional<SendQueue::DataToSend> chunk_beg = |
| buf_.Produce(kNow, /*max_size=*/20); |
| ASSERT_TRUE(chunk_beg.has_value()); |
| EXPECT_TRUE(chunk_beg->data.is_beginning); |
| EXPECT_FALSE(chunk_beg->data.is_end); |
| |
| absl::optional<SendQueue::DataToSend> chunk_mid = |
| buf_.Produce(kNow, /*max_size=*/20); |
| ASSERT_TRUE(chunk_mid.has_value()); |
| EXPECT_FALSE(chunk_mid->data.is_beginning); |
| EXPECT_FALSE(chunk_mid->data.is_end); |
| |
| absl::optional<SendQueue::DataToSend> chunk_end = |
| buf_.Produce(kNow, /*max_size=*/20); |
| ASSERT_TRUE(chunk_end.has_value()); |
| EXPECT_FALSE(chunk_end->data.is_beginning); |
| EXPECT_TRUE(chunk_end->data.is_end); |
| |
| EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); |
| } |
| |
| TEST_F(FCFSSendQueueTest, GetChunksFromTwoMessages) { |
| std::vector<uint8_t> payload(60); |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); |
| |
| absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_one.has_value()); |
| EXPECT_EQ(chunk_one->data.stream_id, kStreamID); |
| EXPECT_EQ(chunk_one->data.ppid, kPPID); |
| EXPECT_TRUE(chunk_one->data.is_beginning); |
| EXPECT_TRUE(chunk_one->data.is_end); |
| |
| absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_two.has_value()); |
| EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); |
| EXPECT_EQ(chunk_two->data.ppid, PPID(54)); |
| EXPECT_TRUE(chunk_two->data.is_beginning); |
| EXPECT_TRUE(chunk_two->data.is_end); |
| } |
| |
| TEST_F(FCFSSendQueueTest, BufferBecomesFullAndEmptied) { |
| std::vector<uint8_t> payload(60); |
| EXPECT_FALSE(buf_.IsFull()); |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| EXPECT_FALSE(buf_.IsFull()); |
| buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); |
| EXPECT_TRUE(buf_.IsFull()); |
| // However, it's still possible to add messages. It's a soft limit, and it |
| // might be necessary to forcefully add messages due to e.g. external |
| // fragmentation. |
| buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); |
| EXPECT_TRUE(buf_.IsFull()); |
| |
| absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_one.has_value()); |
| EXPECT_EQ(chunk_one->data.stream_id, kStreamID); |
| EXPECT_EQ(chunk_one->data.ppid, kPPID); |
| |
| EXPECT_TRUE(buf_.IsFull()); |
| |
| absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_two.has_value()); |
| EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); |
| EXPECT_EQ(chunk_two->data.ppid, PPID(54)); |
| |
| EXPECT_FALSE(buf_.IsFull()); |
| EXPECT_FALSE(buf_.IsEmpty()); |
| |
| absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_three.has_value()); |
| EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); |
| EXPECT_EQ(chunk_three->data.ppid, PPID(55)); |
| |
| EXPECT_FALSE(buf_.IsFull()); |
| EXPECT_TRUE(buf_.IsEmpty()); |
| } |
| |
| TEST_F(FCFSSendQueueTest, WillNotSendTooSmallPacket) { |
| std::vector<uint8_t> payload(FCFSSendQueue::kMinimumFragmentedPayload + 1); |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| |
| // Wouldn't fit enough payload (wouldn't want to fragment) |
| EXPECT_FALSE( |
| buf_.Produce(kNow, |
| /*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload - 1) |
| .has_value()); |
| |
| // Minimum fragment |
| absl::optional<SendQueue::DataToSend> chunk_one = |
| buf_.Produce(kNow, |
| /*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload); |
| ASSERT_TRUE(chunk_one.has_value()); |
| EXPECT_EQ(chunk_one->data.stream_id, kStreamID); |
| EXPECT_EQ(chunk_one->data.ppid, kPPID); |
| |
| // There is only one byte remaining - it can be fetched as it doesn't require |
| // additional fragmentation. |
| absl::optional<SendQueue::DataToSend> chunk_two = |
| buf_.Produce(kNow, /*max_size=*/1); |
| ASSERT_TRUE(chunk_two.has_value()); |
| EXPECT_EQ(chunk_two->data.stream_id, kStreamID); |
| EXPECT_EQ(chunk_two->data.ppid, kPPID); |
| |
| EXPECT_TRUE(buf_.IsEmpty()); |
| } |
| |
| TEST_F(FCFSSendQueueTest, DefaultsToOrderedSend) { |
| std::vector<uint8_t> payload(20); |
| |
| // Default is ordered |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| absl::optional<SendQueue::DataToSend> chunk_one = |
| buf_.Produce(kNow, /*max_size=*/100); |
| ASSERT_TRUE(chunk_one.has_value()); |
| EXPECT_FALSE(chunk_one->data.is_unordered); |
| |
| // Explicitly unordered. |
| SendOptions opts; |
| opts.unordered = IsUnordered(true); |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts); |
| absl::optional<SendQueue::DataToSend> chunk_two = |
| buf_.Produce(kNow, /*max_size=*/100); |
| ASSERT_TRUE(chunk_two.has_value()); |
| EXPECT_TRUE(chunk_two->data.is_unordered); |
| } |
| |
| TEST_F(FCFSSendQueueTest, ProduceWithLifetimeExpiry) { |
| std::vector<uint8_t> payload(20); |
| |
| // Default is no expiry |
| TimeMs now = kNow; |
| buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload)); |
| now = now + DurationMs(1000000); |
| ASSERT_TRUE(buf_.Produce(now, 100)); |
| |
| SendOptions expires_2_seconds; |
| expires_2_seconds.lifetime = DurationMs(2000); |
| |
| // Add and consume within lifetime |
| buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); |
| now = now + DurationMs(1999); |
| ASSERT_TRUE(buf_.Produce(now, 100)); |
| |
| // Add and consume just outside lifetime |
| buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); |
| now = now + DurationMs(2000); |
| ASSERT_FALSE(buf_.Produce(now, 100)); |
| |
| // A long time after expiry |
| buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); |
| now = now + DurationMs(1000000); |
| ASSERT_FALSE(buf_.Produce(now, 100)); |
| |
| // Expire one message, but produce the second that is not expired. |
| buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds); |
| |
| SendOptions expires_4_seconds; |
| expires_4_seconds.lifetime = DurationMs(4000); |
| |
| buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds); |
| now = now + DurationMs(2000); |
| |
| ASSERT_TRUE(buf_.Produce(now, 100)); |
| ASSERT_FALSE(buf_.Produce(now, 100)); |
| } |
| |
| TEST_F(FCFSSendQueueTest, DiscardPartialPackets) { |
| std::vector<uint8_t> payload(120); |
| |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload)); |
| |
| absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_one.has_value()); |
| EXPECT_FALSE(chunk_one->data.is_end); |
| EXPECT_EQ(chunk_one->data.stream_id, kStreamID); |
| buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, |
| chunk_one->data.message_id); |
| |
| absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_two.has_value()); |
| EXPECT_FALSE(chunk_two->data.is_end); |
| EXPECT_EQ(chunk_two->data.stream_id, StreamID(2)); |
| |
| absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_three.has_value()); |
| EXPECT_TRUE(chunk_three->data.is_end); |
| EXPECT_EQ(chunk_three->data.stream_id, StreamID(2)); |
| ASSERT_FALSE(buf_.Produce(kNow, 100)); |
| |
| // Calling it again shouldn't cause issues. |
| buf_.Discard(IsUnordered(false), chunk_one->data.stream_id, |
| chunk_one->data.message_id); |
| ASSERT_FALSE(buf_.Produce(kNow, 100)); |
| } |
| |
| TEST_F(FCFSSendQueueTest, PrepareResetStreamsDiscardsStream) { |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3})); |
| buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5})); |
| EXPECT_EQ(buf_.total_bytes(), 8u); |
| |
| buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)})); |
| EXPECT_EQ(buf_.total_bytes(), 5u); |
| buf_.CommitResetStreams(); |
| buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(2)})); |
| EXPECT_EQ(buf_.total_bytes(), 0u); |
| } |
| |
| TEST_F(FCFSSendQueueTest, PrepareResetStreamsNotPartialPackets) { |
| std::vector<uint8_t> payload(120); |
| |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| |
| absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50); |
| ASSERT_TRUE(chunk_one.has_value()); |
| EXPECT_EQ(chunk_one->data.stream_id, kStreamID); |
| EXPECT_EQ(buf_.total_bytes(), 2 * payload.size() - 50); |
| |
| StreamID stream_ids[] = {StreamID(1)}; |
| buf_.PrepareResetStreams(stream_ids); |
| EXPECT_EQ(buf_.total_bytes(), payload.size() - 50); |
| } |
| |
| TEST_F(FCFSSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { |
| std::vector<uint8_t> payload(50); |
| |
| buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)})); |
| EXPECT_EQ(buf_.total_bytes(), 0u); |
| |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| EXPECT_EQ(buf_.total_bytes(), payload.size()); |
| |
| EXPECT_FALSE(buf_.Produce(kNow, 100).has_value()); |
| buf_.CommitResetStreams(); |
| EXPECT_EQ(buf_.total_bytes(), payload.size()); |
| |
| absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50); |
| ASSERT_TRUE(chunk_one.has_value()); |
| EXPECT_EQ(chunk_one->data.stream_id, kStreamID); |
| EXPECT_EQ(buf_.total_bytes(), 0u); |
| } |
| |
| TEST_F(FCFSSendQueueTest, CommittingResetsSSN) { |
| std::vector<uint8_t> payload(50); |
| |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| |
| absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_one.has_value()); |
| EXPECT_EQ(chunk_one->data.ssn, SSN(0)); |
| |
| absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_two.has_value()); |
| EXPECT_EQ(chunk_two->data.ssn, SSN(1)); |
| |
| StreamID stream_ids[] = {StreamID(1)}; |
| buf_.PrepareResetStreams(stream_ids); |
| |
| // Buffered |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| |
| EXPECT_TRUE(buf_.CanResetStreams()); |
| buf_.CommitResetStreams(); |
| |
| absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_three.has_value()); |
| EXPECT_EQ(chunk_three->data.ssn, SSN(0)); |
| } |
| |
| TEST_F(FCFSSendQueueTest, RollBackResumesSSN) { |
| std::vector<uint8_t> payload(50); |
| |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| |
| absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_one.has_value()); |
| EXPECT_EQ(chunk_one->data.ssn, SSN(0)); |
| |
| absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_two.has_value()); |
| EXPECT_EQ(chunk_two->data.ssn, SSN(1)); |
| |
| buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)})); |
| |
| // Buffered |
| buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); |
| |
| EXPECT_TRUE(buf_.CanResetStreams()); |
| buf_.RollbackResetStreams(); |
| |
| absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100); |
| ASSERT_TRUE(chunk_three.has_value()); |
| EXPECT_EQ(chunk_three->data.ssn, SSN(2)); |
| } |
| |
| } // namespace |
| } // namespace dcsctp |