Reapply "dcsctp: Add per-stream-limit, refactor limits."

Keeping the old setting for the total queue size
limit, which avoids breaking a downstream.

This reverts commit 47ce449afaf9ba38785437fdd338630cad24a77b
and relands commit 4c990e2e56157175324e651f95f3d8c6a0e5c030.

Bug: chromium:40072842
Change-Id: I1e7d14b5d0026232d1fc9277172b6947b8be3490
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/343120
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41907}
diff --git a/net/dcsctp/public/dcsctp_options.h b/net/dcsctp/public/dcsctp_options.h
index 600e8a3..221a856 100644
--- a/net/dcsctp/public/dcsctp_options.h
+++ b/net/dcsctp/public/dcsctp_options.h
@@ -85,10 +85,14 @@
   // buffer is fully utilized.
   size_t max_receiver_window_buffer_size = 5 * 1024 * 1024;
 
-  // Maximum send buffer size. It will not be possible to queue more data than
-  // this before sending it.
+  // Send queue total size limit. It will not be possible to queue more data if
+  // the queue size is larger than this number.
   size_t max_send_buffer_size = 2'000'000;
 
+  // Per stream send queue size limit. Similar to `max_send_buffer_size`, but
+  // limiting the size of individual streams.
+  size_t per_stream_send_queue_limit = 2'000'000;
+
   // A threshold that, when the amount of data in the send buffer goes below
   // this value, will trigger `DcSctpCallbacks::OnTotalBufferedAmountLow`.
   size_t total_buffered_amount_low_threshold = 1'800'000;
diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc
index 0667e6f..d197a38 100644
--- a/net/dcsctp/socket/dcsctp_socket.cc
+++ b/net/dcsctp/socket/dcsctp_socket.cc
@@ -215,7 +215,6 @@
                      absl::bind_front(&DcSctpSocket::OnSentPacket, this)),
       send_queue_(log_prefix_,
                   &callbacks_,
-                  options_.max_send_buffer_size,
                   options_.mtu,
                   options_.default_stream_priority,
                   options_.total_buffered_amount_low_threshold) {}
@@ -544,7 +543,9 @@
                        "Unable to send message as the socket is shutting down");
     return SendStatus::kErrorShuttingDown;
   }
-  if (send_queue_.IsFull()) {
+  if (send_queue_.total_buffered_amount() >= options_.max_send_buffer_size ||
+      send_queue_.buffered_amount(message.stream_id()) >=
+          options_.per_stream_send_queue_limit) {
     if (lifecycle_id.IsSet()) {
       callbacks_.OnLifecycleEnd(lifecycle_id);
     }
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc
index dfe8ba6..2d392d6 100644
--- a/net/dcsctp/socket/dcsctp_socket_test.cc
+++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -1694,6 +1694,37 @@
   MaybeHandoverSocketAndSendMessage(a, std::move(z));
 }
 
+TEST(DcSctpSocketTest, RespectsPerStreamQueueLimit) {
+  DcSctpOptions options = {.max_send_buffer_size = 4000,
+                           .per_stream_send_queue_limit = 1000};
+  SocketUnderTest a("A", options);
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kSuccess);
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kSuccess);
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(1), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kErrorResourceExhaustion);
+  // The per-stream limit for SID=1 is reached, but not SID=2.
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kSuccess);
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kSuccess);
+  EXPECT_EQ(a.socket.Send(
+                DcSctpMessage(StreamID(2), PPID(53), std::vector<uint8_t>(600)),
+                kSendOptions),
+            SendStatus::kErrorResourceExhaustion);
+}
+
 TEST_P(DcSctpSocketParametrizedTest, HasReasonableBufferedAmountValues) {
   SocketUnderTest a("A");
   auto z = std::make_unique<SocketUnderTest>("Z");
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 3e682fd..2193880 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -35,13 +35,11 @@
 
 RRSendQueue::RRSendQueue(absl::string_view log_prefix,
                          DcSctpSocketCallbacks* callbacks,
-                         size_t buffer_size,
                          size_t mtu,
                          StreamPriority default_priority,
                          size_t total_buffered_amount_low_threshold)
     : log_prefix_(log_prefix),
       callbacks_(*callbacks),
-      buffer_size_(buffer_size),
       default_priority_(default_priority),
       scheduler_(log_prefix_, mtu),
       total_buffered_amount_(
@@ -379,10 +377,6 @@
   RTC_DCHECK(IsConsistent());
 }
 
-bool RRSendQueue::IsFull() const {
-  return total_buffered_amount() >= buffer_size_;
-}
-
 bool RRSendQueue::IsEmpty() const {
   return total_buffered_amount() == 0;
 }
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index b6c359d..1a370a2 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -56,7 +56,6 @@
  public:
   RRSendQueue(absl::string_view log_prefix,
               DcSctpSocketCallbacks* callbacks,
-              size_t buffer_size,
               size_t mtu,
               StreamPriority default_priority,
               size_t total_buffered_amount_low_threshold);
@@ -271,7 +270,6 @@
 
   const absl::string_view log_prefix_;
   DcSctpSocketCallbacks& callbacks_;
-  const size_t buffer_size_;
   const StreamPriority default_priority_;
   OutgoingMessageId current_message_id = OutgoingMessageId(0);
   StreamScheduler scheduler_;
diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc
index 632cd8f..9beba95 100644
--- a/net/dcsctp/tx/rr_send_queue_test.cc
+++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -35,7 +35,6 @@
 constexpr Timestamp kNow = Timestamp::Zero();
 constexpr StreamID kStreamID(1);
 constexpr PPID kPPID(53);
-constexpr size_t kMaxQueueSize = 1000;
 constexpr StreamPriority kDefaultPriority(10);
 constexpr size_t kBufferedAmountLowThreshold = 500;
 constexpr size_t kOneFragmentPacketSize = 100;
@@ -47,7 +46,7 @@
   RRSendQueueTest()
       : buf_("log: ",
              &callbacks_,
-             kMaxQueueSize,
+
              kMtu,
              kDefaultPriority,
              kBufferedAmountLowThreshold) {}
@@ -60,14 +59,12 @@
 TEST_F(RRSendQueueTest, EmptyBuffer) {
   EXPECT_TRUE(buf_.IsEmpty());
   EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
-  EXPECT_FALSE(buf_.IsFull());
 }
 
 TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
 
   EXPECT_FALSE(buf_.IsEmpty());
-  EXPECT_FALSE(buf_.IsFull());
   absl::optional<SendQueue::DataToSend> chunk_opt =
       buf_.Produce(kNow, kOneFragmentPacketSize);
   ASSERT_TRUE(chunk_opt.has_value());
@@ -124,30 +121,30 @@
 
 TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
   std::vector<uint8_t> payload(600);
-  EXPECT_FALSE(buf_.IsFull());
+  EXPECT_LT(buf_.total_buffered_amount(), 1000u);
   buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
-  EXPECT_FALSE(buf_.IsFull());
+  EXPECT_LT(buf_.total_buffered_amount(), 1000u);
   buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
-  EXPECT_TRUE(buf_.IsFull());
+  EXPECT_GE(buf_.total_buffered_amount(), 1000u);
   // However, it's still possible to add messages. It's a soft limit, and it
   // might be necessary to forcefully add messages due to e.g. external
   // fragmentation.
   buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
-  EXPECT_TRUE(buf_.IsFull());
+  EXPECT_GE(buf_.total_buffered_amount(), 1000u);
 
   absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 1000);
   ASSERT_TRUE(chunk_one.has_value());
   EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
   EXPECT_EQ(chunk_one->data.ppid, kPPID);
 
-  EXPECT_TRUE(buf_.IsFull());
+  EXPECT_GE(buf_.total_buffered_amount(), 1000u);
 
   absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 1000);
   ASSERT_TRUE(chunk_two.has_value());
   EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
   EXPECT_EQ(chunk_two->data.ppid, PPID(54));
 
-  EXPECT_FALSE(buf_.IsFull());
+  EXPECT_LT(buf_.total_buffered_amount(), 1000u);
   EXPECT_FALSE(buf_.IsEmpty());
 
   absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 1000);
@@ -155,7 +152,7 @@
   EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
   EXPECT_EQ(chunk_three->data.ppid, PPID(55));
 
-  EXPECT_FALSE(buf_.IsFull());
+  EXPECT_LT(buf_.total_buffered_amount(), 1000u);
   EXPECT_TRUE(buf_.IsEmpty());
 }
 
@@ -813,7 +810,7 @@
   DcSctpSocketHandoverState state;
   buf_.AddHandoverState(state);
 
-  RRSendQueue q2("log: ", &callbacks_, kMaxQueueSize, kMtu, kDefaultPriority,
+  RRSendQueue q2("log: ", &callbacks_, kMtu, kDefaultPriority,
                  kBufferedAmountLowThreshold);
   q2.RestoreFromState(state);
   EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));