dcsctp: Add per-stream-limit, refactor limits.

The limits have been moved out from the Send Queue as they were enforced
outside the queue anyway (in the socket). That was a preparation for
adding even more limits; There is now also a per-stream limit, allowing
individual streams to have one (global) limit, and the entire socket to
have another limit.

These limits are very small in the default options. In Chrome, the limit
is 16MB per stream, so expect the defaults to be updated when the
additional buffering outside dcSCTP is removed.

Bug: chromium:41221056
Change-Id: I9f835be05d349cbfce3e9235d34b5ea0e2fe87d1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/342481
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41895}
diff --git a/net/dcsctp/public/dcsctp_options.h b/net/dcsctp/public/dcsctp_options.h
index 600e8a3..6a6a8cd 100644
--- a/net/dcsctp/public/dcsctp_options.h
+++ b/net/dcsctp/public/dcsctp_options.h
@@ -85,9 +85,13 @@
   // buffer is fully utilized.
   size_t max_receiver_window_buffer_size = 5 * 1024 * 1024;
 
-  // Maximum send buffer size. It will not be possible to queue more data than
-  // this before sending it.
-  size_t max_send_buffer_size = 2'000'000;
+  // Send queue total size limit. It will not be possible to queue more data if
+  // the queue size is larger than this number.
+  size_t total_send_queue_limit = 2'000'000;
+
+  // Per stream send queue size limit. Similar to `total_send_queue_limit`, but
+  // limiting the size of individual streams.
+  size_t per_stream_send_queue_limit = 2'000'000;
 
   // A threshold that, when the amount of data in the send buffer goes below
   // this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`.
diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc
index 0667e6f..c92ec37 100644
--- a/net/dcsctp/socket/dcsctp_socket.cc
+++ b/net/dcsctp/socket/dcsctp_socket.cc
@@ -215,7 +215,6 @@
                      absl::bind_front(&DcSctpSocket::OnSentPacket, this)),
       send_queue_(log_prefix_,
                   &callbacks_,
-                  options_.max_send_buffer_size,
                   options_.mtu,
                   options_.default_stream_priority,
                   options_.total_buffered_amount_low_threshold) {}
@@ -544,7 +543,9 @@
                        "Unable to send message as the socket is shutting down");
     return SendStatus::kErrorShuttingDown;
   }
-  if (send_queue_.IsFull()) {
+  if (send_queue_.total_buffered_amount() >= options_.total_send_queue_limit ||
+      send_queue_.buffered_amount(message.stream_id()) >=
+          options_.per_stream_send_queue_limit) {
     if (lifecycle_id.IsSet()) {
       callbacks_.OnLifecycleEnd(lifecycle_id);
     }
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc
index dfe8ba6..d64774b 100644
--- a/net/dcsctp/socket/dcsctp_socket_test.cc
+++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -1694,6 +1694,37 @@
   MaybeHandoverSocketAndSendMessage(a, std::move(z));
 }
 
+TEST(DcSctpSocketTest, RespectsPerStreamQueueLimit) {
+  DcSctpOptions options = {.total_send_queue_limit = 4000,
+                           .per_stream_send_queue_limit = 1000};
+  SocketUnderTest a("A", options);
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kSuccess);
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kSuccess);
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kErrorResourceExhaustion);
+  // The per-stream limit for SID=1 is reached, but not SID=2.
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kSuccess);
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kSuccess);
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kErrorResourceExhaustion);
+}
+
 TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) {
   SocketUnderTest a("A");
   auto z = std::make_unique<SocketUnderTest>("Z");
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 3e682fd..2193880 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -35,13 +35,11 @@
 
 RRSendQueue::RRSendQueue(absl::string_view log_prefix,
                          DcSctpSocketCallbacks* callbacks,
-                         size_t buffer_size,
                          size_t mtu,
                          StreamPriority default_priority,
                          size_t total_buffered_amount_low_threshold)
     : log_prefix_(log_prefix),
       callbacks_(*callbacks),
-      buffer_size_(buffer_size),
       default_priority_(default_priority),
       scheduler_(log_prefix_, mtu),
       total_buffered_amount_(
@@ -379,10 +377,6 @@
   RTC_DCHECK(IsConsistent());
 }
 
-bool RRSendQueue::IsFull() const {
-  return total_buffered_amount() >= buffer_size_;
-}
-
 bool RRSendQueue::IsEmpty() const {
   return total_buffered_amount() == 0;
 }
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index b6c359d..1a370a2 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -56,7 +56,6 @@
  public:
   RRSendQueue(absl::string_view log_prefix,
               DcSctpSocketCallbacks* callbacks,
-              size_t buffer_size,
               size_t mtu,
               StreamPriority default_priority,
               size_t total_buffered_amount_low_threshold);
@@ -271,7 +270,6 @@
 
   const absl::string_view log_prefix_;
   DcSctpSocketCallbacks& callbacks_;
-  const size_t buffer_size_;
   const StreamPriority default_priority_;
   OutgoingMessageId current_message_id = OutgoingMessageId(0);
   StreamScheduler scheduler_;
diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc
index 632cd8f..9beba95 100644
--- a/net/dcsctp/tx/rr_send_queue_test.cc
+++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -35,7 +35,6 @@
 constexpr Timestamp kNow = Timestamp::Zero();
 constexpr StreamID kStreamID(1);
 constexpr PPID kPPID(53);
-constexpr size_t kMaxQueueSize = 1000;
 constexpr StreamPriority kDefaultPriority(10);
 constexpr size_t kBufferedAmountLowThreshold = 500;
 constexpr size_t kOneFragmentPacketSize = 100;
@@ -47,7 +46,7 @@
   RRSendQueueTest()
       : buf_("log: ",
              &callbacks_,
-             kMaxQueueSize,
+
              kMtu,
              kDefaultPriority,
              kBufferedAmountLowThreshold) {}
@@ -60,14 +59,12 @@
 TEST_F(RRSendQueueTest, EmptyBuffer) {
   EXPECT_TRUE(buf_.IsEmpty());
   EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
-  EXPECT_FALSE(buf_.IsFull());
 }
 
 TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
 
   EXPECT_FALSE(buf_.IsEmpty());
-  EXPECT_FALSE(buf_.IsFull());
   absl::optional<SendQueue::DataToSend> chunk_opt =
       buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_opt.has_value());
@@ -124,30 +121,30 @@
 
 TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
   std::vector<uint8_t> payload(600);
-  EXPECT_FALSE(buf_.IsFull());
+  EXPECT_LT(buf_.total_buffered_amount(), 1000u);
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
-  EXPECT_FALSE(buf_.IsFull());
+  EXPECT_LT(buf_.total_buffered_amount(), 1000u);
   buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
-  EXPECT_TRUE(buf_.IsFull());
+  EXPECT_GE(buf_.total_buffered_amount(), 1000u);
   // However, it's still possible to add messages. It's a soft limit, and it
   // might be necessary to forcefully add messages due to e.g. external
   // fragmentation.
   buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
-  EXPECT_TRUE(buf_.IsFull());
+  EXPECT_GE(buf_.total_buffered_amount(), 1000u);
 
   absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
   ASSERT_TRUE(chunk_one.has_value());
   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
   EXPECT_EQ(chunk_one->data.ppid, kPPID);
 
-  EXPECT_TRUE(buf_.IsFull());
+  EXPECT_GE(buf_.total_buffered_amount(), 1000u);
 
   absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
   ASSERT_TRUE(chunk_two.has_value());
   EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
   EXPECT_EQ(chunk_two->data.ppid, PPID(54));
 
-  EXPECT_FALSE(buf_.IsFull());
+  EXPECT_LT(buf_.total_buffered_amount(), 1000u);
   EXPECT_FALSE(buf_.IsEmpty());
 
   absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
@@ -155,7 +152,7 @@
   EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
   EXPECT_EQ(chunk_three->data.ppid, PPID(55));
 
-  EXPECT_FALSE(buf_.IsFull());
+  EXPECT_LT(buf_.total_buffered_amount(), 1000u);
   EXPECT_TRUE(buf_.IsEmpty());
 }
 
@@ -813,7 +810,7 @@
   DcSctpSocketHandoverState state;
   buf_.AddHandoverState(state);
 
-  RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority,
+  RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority,
                  kBufferedAmountLowThreshold);
   q2.RestoreFromState(state);
   EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));