Reapply "dcsctp: Add per-stream-limit, refactor limits." Keeping the old setting for the total queue size limit, which avoids breaking a downstream. This reverts commit 47ce449afaf9ba38785437fdd338630cad24a77b and relands commit 4c990e2e56157175324e651f95f3d8c6a0e5c030. Bug: chromium:40072842 Change-Id: I1e7d14b5d0026232d1fc9277172b6947b8be3490 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/343120 Commit-Queue: Victor Boivie <boivie@webrtc.org> Reviewed-by: Florent Castelli <orphis@webrtc.org> Cr-Commit-Position: refs/heads/main@{#41907}
diff --git a/net/dcsctp/public/dcsctp_options.h b/net/dcsctp/public/dcsctp_options.h index 600e8a3..221a856 100644 --- a/net/dcsctp/public/dcsctp_options.h +++ b/net/dcsctp/public/dcsctp_options.h
@@ -85,10 +85,14 @@ // buffer is fully utilized. size_t max_receiver_window_buffer_size = 5 * 1024 * 1024; - // Maximum send buffer size. It will not be possible to queue more data than - // this before sending it. + // Send queue total size limit. It will not be possible to queue more data if + // the queue size is larger than this number. size_t max_send_buffer_size = 2'000'000; + // Per stream send queue size limit. Similar to `max_send_buffer_size`, but + // limiting the size of individual streams. + size_t per_stream_send_queue_limit = 2'000'000; + // A threshold that, when the amount of data in the send buffer goes below // this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`. size_t total_buffered_amount_low_threshold = 1'800'000;
diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc index 0667e6f..d197a38 100644 --- a/net/dcsctp/socket/dcsctp_socket.cc +++ b/net/dcsctp/socket/dcsctp_socket.cc
@@ -215,7 +215,6 @@ absl::bind_front(&DcSctpSocket::OnSentPacket, this)), send_queue_(log_prefix_, &callbacks_, - options_.max_send_buffer_size, options_.mtu, options_.default_stream_priority, options_.total_buffered_amount_low_threshold) {} @@ -544,7 +543,9 @@ "Unable to send message as the socket is shutting down"); return SendStatus::kErrorShuttingDown; } - if (send_queue_.IsFull()) { + if (send_queue_.total_buffered_amount() >= options_.max_send_buffer_size || + send_queue_.buffered_amount(message.stream_id()) >= + options_.per_stream_send_queue_limit) { if (lifecycle_id.IsSet()) { callbacks_.OnLifecycleEnd(lifecycle_id); }
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc index dfe8ba6..2d392d6 100644 --- a/net/dcsctp/socket/dcsctp_socket_test.cc +++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -1694,6 +1694,37 @@ MaybeHandoverSocketAndSendMessage(a, std::move(z)); } +TEST(DcSctpSocketTest, RespectsPerStreamQueueLimit) { + DcSctpOptions options = {.max_send_buffer_size = 4000, + .per_stream_send_queue_limit = 1000}; + SocketUnderTest a("A", options); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)), + kSendOptions), + SendStatus::kErrorResourceExhaustion); + // The per-stream limit for SID=1 is reached, but not SID=2. + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)), + kSendOptions), + SendStatus::kSuccess); + EXPECT_EQ(a.socket.Send( + DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)), + kSendOptions), + SendStatus::kErrorResourceExhaustion); +} + TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) { SocketUnderTest a("A"); auto z = std::make_unique<SocketUnderTest>("Z");
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 3e682fd..2193880 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -35,13 +35,11 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, - size_t buffer_size, size_t mtu, StreamPriority default_priority, size_t total_buffered_amount_low_threshold) : log_prefix_(log_prefix), callbacks_(*callbacks), - buffer_size_(buffer_size), default_priority_(default_priority), scheduler_(log_prefix_, mtu), total_buffered_amount_( @@ -379,10 +377,6 @@ RTC_DCHECK(IsConsistent()); } -bool RRSendQueue::IsFull() const { - return total_buffered_amount() >= buffer_size_; -} - bool RRSendQueue::IsEmpty() const { return total_buffered_amount() == 0; }
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index b6c359d..1a370a2 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h
@@ -56,7 +56,6 @@ public: RRSendQueue(absl::string_view log_prefix, DcSctpSocketCallbacks* callbacks, - size_t buffer_size, size_t mtu, StreamPriority default_priority, size_t total_buffered_amount_low_threshold); @@ -271,7 +270,6 @@ const absl::string_view log_prefix_; DcSctpSocketCallbacks& callbacks_; - const size_t buffer_size_; const StreamPriority default_priority_; OutgoingMessageId current_message_id = OutgoingMessageId(0); StreamScheduler scheduler_;
diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc index 632cd8f..9beba95 100644 --- a/net/dcsctp/tx/rr_send_queue_test.cc +++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -35,7 +35,6 @@ constexpr Timestamp kNow = Timestamp::Zero(); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); -constexpr size_t kMaxQueueSize = 1000; constexpr StreamPriority kDefaultPriority(10); constexpr size_t kBufferedAmountLowThreshold = 500; constexpr size_t kOneFragmentPacketSize = 100; @@ -47,7 +46,7 @@ RRSendQueueTest() : buf_("log: ", &callbacks_, - kMaxQueueSize, + kMtu, kDefaultPriority, kBufferedAmountLowThreshold) {} @@ -60,14 +59,12 @@ TEST_F(RRSendQueueTest, EmptyBuffer) { EXPECT_TRUE(buf_.IsEmpty()); EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); - EXPECT_FALSE(buf_.IsFull()); } TEST_F(RRSendQueueTest, AddAndGetSingleChunk) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6})); EXPECT_FALSE(buf_.IsEmpty()); - EXPECT_FALSE(buf_.IsFull()); absl::optional<SendQueue::DataToSend> chunk_opt = buf_.Produce(kNow, kOneFragmentPacketSize); ASSERT_TRUE(chunk_opt.has_value()); @@ -124,30 +121,30 @@ TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) { std::vector<uint8_t> payload(600); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload)); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); // However, it's still possible to add messages. It's a soft limit, and it // might be necessary to forcefully add messages due to e.g. external // fragmentation. buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload)); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); EXPECT_EQ(chunk_one->data.ppid, kPPID); - EXPECT_TRUE(buf_.IsFull()); + EXPECT_GE(buf_.total_buffered_amount(), 1000u); absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000); ASSERT_TRUE(chunk_two.has_value()); EXPECT_EQ(chunk_two->data.stream_id, StreamID(3)); EXPECT_EQ(chunk_two->data.ppid, PPID(54)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); EXPECT_FALSE(buf_.IsEmpty()); absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000); @@ -155,7 +152,7 @@ EXPECT_EQ(chunk_three->data.stream_id, StreamID(5)); EXPECT_EQ(chunk_three->data.ppid, PPID(55)); - EXPECT_FALSE(buf_.IsFull()); + EXPECT_LT(buf_.total_buffered_amount(), 1000u); EXPECT_TRUE(buf_.IsEmpty()); } @@ -813,7 +810,7 @@ DcSctpSocketHandoverState state; buf_.AddHandoverState(state); - RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority, + RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority, kBufferedAmountLowThreshold); q2.RestoreFromState(state); EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));