pc: Remove additional buffering in SctpDataChannel

This CL removes the send buffers (but not the receive buffer) from
SctpDataChannel and increases the send buffer in DcSctpSocket instead.

The reasons are:
 1) Simplify the code. This additional buffering was strictly needed
    before we migrated away from usrsctp, as that send buffer was very
    limited in size (by design). But with the migration to dcSCTP, it's
    no longer needed, so it just adds complexity.
 2) Make `RTCDataChannel::bufferedAmount` correct. Before this CL, it
    represented just the data buffered in SctpDataChannel, and not the
    data accepted by the SCTP socket, but not yet put on the wire. This
    makes it hard for clients to know when a message has ever been sent.
 3) Better handle draining data on data channel close. While this is not
    implemented in dcSCTP, having a single buffer makes this easier to
    add.

While most of this CL is straightforward, the handling of bufferedAmount
in the signaling thread (in RTCDataChannel in Blink), is a bit special.
The number returned by `RTCDataChannel::bufferedAmount` is not what the
true value is inside the SCTP socket, but an eventual consistent view
of that value. When a message is sent, the value is incremented and:
  - Before this change: When a message was put on the SCTP socket, the
    view's value was decremented. Which made the view reflect what was
    buffered outside the SCTP socket, and that buffering is now gone.
  - After this change: SctpDataChannel will track what RTCDataChannel
    will think it is, and provide updates to that number as we are
    notified that it's reduced - by setting a "low threshold" callback
    trigger.

A bonus with the new behavior is that it will be eventually consistent
and auto-heal also in error conditions - when messages are dropped due
to errors (bad input, bad state, etc). Previously, the bufferedAmount
value could drift away from the correct value on errors.

Note that a big chunk of unit tests were removed with this CL, as those
tested how the buffering behaved. Now, there is no buffering, so the
removed test cases represent a simpler interface.

This CL has been extensively tested with data channel benchmarks that
use the bufferedAmount thresholds (in Javascript).

Bug: chromium:40072842
Change-Id: I1a6a4af6b6e1116832f5028f989ce9f44683d229
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/343361
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41945}
diff --git a/media/BUILD.gn b/media/BUILD.gn
index 2a8f28a..794f7a1 100644
--- a/media/BUILD.gn
+++ b/media/BUILD.gn
@@ -721,6 +721,7 @@
       ":rtc_data_sctp_transport_internal",
       "../api:array_view",
       "../api/environment",
+      "../api:libjingle_peerconnection_api",
       "../api/task_queue:pending_task_safety_flag",
       "../api/task_queue:task_queue",
       "../net/dcsctp/public:factory",
diff --git a/media/sctp/dcsctp_transport.cc b/media/sctp/dcsctp_transport.cc
index 53a535f..99ecc94 100644
--- a/media/sctp/dcsctp_transport.cc
+++ b/media/sctp/dcsctp_transport.cc
@@ -19,6 +19,7 @@
 #include "absl/strings/string_view.h"
 #include "absl/types/optional.h"
 #include "api/array_view.h"
+#include "api/data_channel_interface.h"
 #include "api/environment/environment.h"
 #include "media/base/media_channel.h"
 #include "net/dcsctp/public/dcsctp_socket_factory.h"
@@ -192,6 +193,10 @@
     // Don't close the connection automatically on too many retransmissions.
     options.max_retransmissions = absl::nullopt;
     options.max_init_retransmits = absl::nullopt;
+    options.per_stream_send_queue_limit =
+        DataChannelInterface::MaxSendQueueSize();
+    // This is just set to avoid denial-of-service. Practically unlimited.
+    options.max_send_buffer_size = std::numeric_limits<size_t>::max();
 
     std::unique_ptr<dcsctp::PacketObserver> packet_observer;
     if (RTC_LOG_CHECK_LEVEL(LS_VERBOSE)) {
diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc
index a7d3627..ac229d6 100644
--- a/pc/data_channel_unittest.cc
+++ b/pc/data_channel_unittest.cc
@@ -222,266 +222,6 @@
   EXPECT_FALSE(controller_->IsConnected(inner_channel_.get()));
 }
 
-// Tests that DataChannel::buffered_amount() is correct after the channel is
-// blocked.
-TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
-  AddObserver();
-  SetChannelReady();
-  DataBuffer buffer("abcd");
-  size_t successful_sends = 0;
-  auto send_complete = [&](RTCError err) {
-    EXPECT_TRUE(err.ok());
-    ++successful_sends;
-  };
-  channel_->SendAsync(buffer, send_complete);
-  FlushNetworkThreadAndPendingOperations();
-  EXPECT_EQ(channel_->buffered_amount(), 0u);
-  size_t successful_send_count = 1;
-  EXPECT_EQ(successful_send_count, successful_sends);
-  EXPECT_EQ(successful_send_count,
-            observer_->on_buffered_amount_change_count());
-
-  controller_->set_send_blocked(true);
-  const int number_of_packets = 3;
-  for (int i = 0; i < number_of_packets; ++i) {
-    channel_->SendAsync(buffer, send_complete);
-    ++successful_send_count;
-  }
-  FlushNetworkThreadAndPendingOperations();
-  EXPECT_EQ(buffer.data.size() * number_of_packets,
-            channel_->buffered_amount());
-  EXPECT_EQ(successful_send_count, successful_sends);
-
-  // An event should not have been fired for buffered amount.
-  EXPECT_EQ(1u, observer_->on_buffered_amount_change_count());
-
-  // Now buffered amount events should get fired and the value
-  // get down to 0u.
-  controller_->set_send_blocked(false);
-  run_loop_.Flush();
-  EXPECT_EQ(channel_->buffered_amount(), 0u);
-  EXPECT_EQ(successful_send_count, successful_sends);
-  EXPECT_EQ(successful_send_count,
-            observer_->on_buffered_amount_change_count());
-}
-
-// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
-TEST_F(SctpDataChannelTest, DeprecatedBufferedAmountWhenBlocked) {
-  AddObserver();
-  SetChannelReady();
-  DataBuffer buffer("abcd");
-  EXPECT_TRUE(channel_->Send(buffer));
-  size_t successful_send_count = 1;
-
-  run_loop_.Flush();
-  EXPECT_EQ(0U, channel_->buffered_amount());
-  EXPECT_EQ(successful_send_count,
-            observer_->on_buffered_amount_change_count());
-
-  controller_->set_send_blocked(true);
-
-  const int number_of_packets = 3;
-  for (int i = 0; i < number_of_packets; ++i) {
-    EXPECT_TRUE(channel_->Send(buffer));
-  }
-  EXPECT_EQ(buffer.data.size() * number_of_packets,
-            channel_->buffered_amount());
-  EXPECT_EQ(successful_send_count,
-            observer_->on_buffered_amount_change_count());
-
-  controller_->set_send_blocked(false);
-  run_loop_.Flush();
-  successful_send_count += number_of_packets;
-  EXPECT_EQ(channel_->buffered_amount(), 0u);
-  EXPECT_EQ(successful_send_count,
-            observer_->on_buffered_amount_change_count());
-}
-
-// Tests that the queued data are sent when the channel transitions from blocked
-// to unblocked.
-TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) {
-  AddObserver();
-  SetChannelReady();
-  DataBuffer buffer("abcd");
-  controller_->set_send_blocked(true);
-  size_t successful_send = 0u;
-  auto send_complete = [&](RTCError err) {
-    EXPECT_TRUE(err.ok());
-    ++successful_send;
-  };
-  channel_->SendAsync(buffer, send_complete);
-  FlushNetworkThreadAndPendingOperations();
-  EXPECT_EQ(1U, successful_send);
-  EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
-
-  controller_->set_send_blocked(false);
-  SetChannelReady();
-  EXPECT_EQ(channel_->buffered_amount(), 0u);
-  EXPECT_EQ(observer_->on_buffered_amount_change_count(), 1u);
-}
-
-// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
-TEST_F(SctpDataChannelTest, DeprecatedQueuedDataSentWhenUnblocked) {
-  AddObserver();
-  SetChannelReady();
-  DataBuffer buffer("abcd");
-  controller_->set_send_blocked(true);
-  EXPECT_TRUE(channel_->Send(buffer));
-
-  EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
-
-  controller_->set_send_blocked(false);
-  SetChannelReady();
-  EXPECT_EQ(0U, channel_->buffered_amount());
-  EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
-}
-
-// Tests that no crash when the channel is blocked right away while trying to
-// send queued data.
-TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) {
-  AddObserver();
-  SetChannelReady();
-  DataBuffer buffer("abcd");
-  controller_->set_send_blocked(true);
-  size_t successful_send = 0u;
-  auto send_complete = [&](RTCError err) {
-    EXPECT_TRUE(err.ok());
-    ++successful_send;
-  };
-  channel_->SendAsync(buffer, send_complete);
-  FlushNetworkThreadAndPendingOperations();
-  EXPECT_EQ(1U, successful_send);
-  EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
-
-  // Set channel ready while it is still blocked.
-  SetChannelReady();
-  EXPECT_EQ(buffer.size(), channel_->buffered_amount());
-  EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
-
-  // Unblock the channel to send queued data again, there should be no crash.
-  controller_->set_send_blocked(false);
-  SetChannelReady();
-  EXPECT_EQ(0U, channel_->buffered_amount());
-  EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
-}
-
-// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
-TEST_F(SctpDataChannelTest, DeprecatedBlockedWhenSendQueuedDataNoCrash) {
-  AddObserver();
-  SetChannelReady();
-  DataBuffer buffer("abcd");
-  controller_->set_send_blocked(true);
-  EXPECT_TRUE(channel_->Send(buffer));
-  EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
-
-  // Set channel ready while it is still blocked.
-  SetChannelReady();
-  EXPECT_EQ(buffer.size(), channel_->buffered_amount());
-  EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
-
-  // Unblock the channel to send queued data again, there should be no crash.
-  controller_->set_send_blocked(false);
-  SetChannelReady();
-  EXPECT_EQ(0U, channel_->buffered_amount());
-  EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
-}
-
-// Tests that DataChannel::messages_sent() and DataChannel::bytes_sent() are
-// correct, sending data both while unblocked and while blocked.
-TEST_F(SctpDataChannelTest, VerifyMessagesAndBytesSent) {
-  AddObserver();
-  SetChannelReady();
-  std::vector<DataBuffer> buffers({
-      DataBuffer("message 1"),
-      DataBuffer("msg 2"),
-      DataBuffer("message three"),
-      DataBuffer("quadra message"),
-      DataBuffer("fifthmsg"),
-      DataBuffer("message of the beast"),
-  });
-
-  // Default values.
-  EXPECT_EQ(0U, channel_->messages_sent());
-  EXPECT_EQ(0U, channel_->bytes_sent());
-
-  // Send three buffers while not blocked.
-  controller_->set_send_blocked(false);
-  for (int i : {0, 1, 2}) {
-    channel_->SendAsync(buffers[i], nullptr);
-  }
-  FlushNetworkThreadAndPendingOperations();
-
-  size_t bytes_sent = buffers[0].size() + buffers[1].size() + buffers[2].size();
-  EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout);
-  EXPECT_EQ(3U, channel_->messages_sent());
-  EXPECT_EQ(bytes_sent, channel_->bytes_sent());
-
-  // Send three buffers while blocked, queuing the buffers.
-  controller_->set_send_blocked(true);
-  for (int i : {3, 4, 5}) {
-    channel_->SendAsync(buffers[i], nullptr);
-  }
-  FlushNetworkThreadAndPendingOperations();
-  size_t bytes_queued =
-      buffers[3].size() + buffers[4].size() + buffers[5].size();
-  EXPECT_EQ(bytes_queued, channel_->buffered_amount());
-  EXPECT_EQ(3U, channel_->messages_sent());
-  EXPECT_EQ(bytes_sent, channel_->bytes_sent());
-
-  // Unblock and make sure everything was sent.
-  controller_->set_send_blocked(false);
-  EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout);
-  bytes_sent += bytes_queued;
-  EXPECT_EQ(6U, channel_->messages_sent());
-  EXPECT_EQ(bytes_sent, channel_->bytes_sent());
-}
-
-// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
-TEST_F(SctpDataChannelTest, DeprecatedVerifyMessagesAndBytesSent) {
-  AddObserver();
-  SetChannelReady();
-  std::vector<DataBuffer> buffers({
-      DataBuffer("message 1"),
-      DataBuffer("msg 2"),
-      DataBuffer("message three"),
-      DataBuffer("quadra message"),
-      DataBuffer("fifthmsg"),
-      DataBuffer("message of the beast"),
-  });
-
-  // Default values.
-  EXPECT_EQ(0U, channel_->messages_sent());
-  EXPECT_EQ(0U, channel_->bytes_sent());
-
-  // Send three buffers while not blocked.
-  controller_->set_send_blocked(false);
-  EXPECT_TRUE(channel_->Send(buffers[0]));
-  EXPECT_TRUE(channel_->Send(buffers[1]));
-  EXPECT_TRUE(channel_->Send(buffers[2]));
-  size_t bytes_sent = buffers[0].size() + buffers[1].size() + buffers[2].size();
-  EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout);
-  EXPECT_EQ(3U, channel_->messages_sent());
-  EXPECT_EQ(bytes_sent, channel_->bytes_sent());
-
-  // Send three buffers while blocked, queuing the buffers.
-  controller_->set_send_blocked(true);
-  EXPECT_TRUE(channel_->Send(buffers[3]));
-  EXPECT_TRUE(channel_->Send(buffers[4]));
-  EXPECT_TRUE(channel_->Send(buffers[5]));
-  size_t bytes_queued =
-      buffers[3].size() + buffers[4].size() + buffers[5].size();
-  EXPECT_EQ(bytes_queued, channel_->buffered_amount());
-  EXPECT_EQ(3U, channel_->messages_sent());
-  EXPECT_EQ(bytes_sent, channel_->bytes_sent());
-
-  // Unblock and make sure everything was sent.
-  controller_->set_send_blocked(false);
-  EXPECT_EQ_WAIT(0U, channel_->buffered_amount(), kDefaultTimeout);
-  bytes_sent += bytes_queued;
-  EXPECT_EQ(6U, channel_->messages_sent());
-  EXPECT_EQ(bytes_sent, channel_->bytes_sent());
-}
-
 // Tests that the queued control message is sent when channel is ready.
 TEST_F(SctpDataChannelTest, OpenMessageSent) {
   // Initially the id is unassigned.
@@ -494,16 +234,6 @@
   EXPECT_EQ(controller_->last_sid(), channel_->id());
 }
 
-TEST_F(SctpDataChannelTest, QueuedOpenMessageSent) {
-  controller_->set_send_blocked(true);
-  SetChannelReady();
-  controller_->set_send_blocked(false);
-
-  EXPECT_EQ(DataMessageType::kControl,
-            controller_->last_send_data_params().type);
-  EXPECT_EQ(controller_->last_sid(), channel_->id());
-}
-
 // Tests that the DataChannel created after transport gets ready can enter OPEN
 // state.
 TEST_F(SctpDataChannelTest, LateCreatedChannelTransitionToOpen) {
@@ -619,56 +349,6 @@
   EXPECT_FALSE(controller_->last_send_data_params().ordered);
 }
 
-// Tests that the channel can't open until it's successfully sent the OPEN
-// message.
-TEST_F(SctpDataChannelTest, OpenWaitsForOpenMesssage) {
-  DataBuffer buffer("foo");
-
-  controller_->set_send_blocked(true);
-  SetChannelReady();
-  EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state());
-  controller_->set_send_blocked(false);
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, channel_->state(), 1000);
-  EXPECT_EQ(DataMessageType::kControl,
-            controller_->last_send_data_params().type);
-}
-
-// Tests that close first makes sure all queued data gets sent.
-TEST_F(SctpDataChannelTest, QueuedCloseFlushes) {
-  DataBuffer buffer("foo");
-
-  controller_->set_send_blocked(true);
-  SetChannelReady();
-  EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state());
-  controller_->set_send_blocked(false);
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, channel_->state(), 1000);
-  controller_->set_send_blocked(true);
-  channel_->SendAsync(buffer, nullptr);
-  channel_->Close();
-  controller_->set_send_blocked(false);
-  EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), 1000);
-  EXPECT_TRUE(channel_->error().ok());
-  EXPECT_EQ(DataMessageType::kText, controller_->last_send_data_params().type);
-}
-
-// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
-TEST_F(SctpDataChannelTest, DeprecatedQueuedCloseFlushes) {
-  DataBuffer buffer("foo");
-
-  controller_->set_send_blocked(true);
-  SetChannelReady();
-  EXPECT_EQ(DataChannelInterface::kConnecting, channel_->state());
-  controller_->set_send_blocked(false);
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, channel_->state(), 1000);
-  controller_->set_send_blocked(true);
-  channel_->Send(buffer);
-  channel_->Close();
-  controller_->set_send_blocked(false);
-  EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(), 1000);
-  EXPECT_TRUE(channel_->error().ok());
-  EXPECT_EQ(DataMessageType::kText, controller_->last_send_data_params().type);
-}
-
 // Tests that messages are sent with the right id.
 TEST_F(SctpDataChannelTest, SendDataId) {
   SetChannelSid(inner_channel_, StreamId(1));
@@ -800,59 +480,6 @@
   EXPECT_EQ(InternalDataChannelInit::kNone, init2.open_handshake_role);
 }
 
-// Tests that that Send() returns false if the sending buffer is full
-// and the channel stays open.
-TEST_F(SctpDataChannelTest, OpenWhenSendBufferFull) {
-  AddObserver();
-  SetChannelReady();
-
-  const size_t packetSize = 1024;
-
-  rtc::CopyOnWriteBuffer buffer(packetSize);
-  memset(buffer.MutableData(), 0, buffer.size());
-
-  DataBuffer packet(buffer, true);
-  controller_->set_send_blocked(true);
-  size_t successful_send = 0u, failed_send = 0u;
-  auto send_complete = [&](RTCError err) {
-    err.ok() ? ++successful_send : ++failed_send;
-  };
-
-  size_t count = DataChannelInterface::MaxSendQueueSize() / packetSize;
-  for (size_t i = 0; i < count; ++i) {
-    channel_->SendAsync(packet, send_complete);
-  }
-
-  // The sending buffer should be full, `Send()` returns false.
-  channel_->SendAsync(packet, std::move(send_complete));
-  FlushNetworkThreadAndPendingOperations();
-  EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state());
-  EXPECT_EQ(successful_send, count);
-  EXPECT_EQ(failed_send, 1u);
-}
-
-// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
-TEST_F(SctpDataChannelTest, DeprecatedOpenWhenSendBufferFull) {
-  SetChannelReady();
-
-  const size_t packetSize = 1024;
-
-  rtc::CopyOnWriteBuffer buffer(packetSize);
-  memset(buffer.MutableData(), 0, buffer.size());
-
-  DataBuffer packet(buffer, true);
-  controller_->set_send_blocked(true);
-
-  for (size_t i = 0; i < DataChannelInterface::MaxSendQueueSize() / packetSize;
-       ++i) {
-    EXPECT_TRUE(channel_->Send(packet));
-  }
-
-  // The sending buffer should be full, `Send()` returns false.
-  EXPECT_FALSE(channel_->Send(packet));
-  EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state());
-}
-
 // Tests that the DataChannel is closed on transport errors.
 TEST_F(SctpDataChannelTest, ClosedOnTransportError) {
   SetChannelReady();
@@ -943,12 +570,11 @@
   AddObserver();
   SetChannelReady();
 
-  rtc::CopyOnWriteBuffer buffer(1024);
+  rtc::CopyOnWriteBuffer buffer(100 * 1024);
   memset(buffer.MutableData(), 0, buffer.size());
   DataBuffer packet(buffer, true);
 
-  // Send a packet while sending is blocked so it ends up buffered.
-  controller_->set_send_blocked(true);
+  // Send a very large packet, forcing the message to become buffered.
   channel_->SendAsync(packet, nullptr);
 
   // Tell the data channel that its transport is being destroyed.
@@ -966,33 +592,6 @@
   EXPECT_EQ(RTCErrorDetailType::SCTP_FAILURE, channel_->error().error_detail());
 }
 
-// TODO(tommi): This test uses `Send()`. Remove once fully deprecated.
-TEST_F(SctpDataChannelTest, DeprecatedTransportDestroyedWhileDataBuffered) {
-  SetChannelReady();
-
-  rtc::CopyOnWriteBuffer buffer(1024);
-  memset(buffer.MutableData(), 0, buffer.size());
-  DataBuffer packet(buffer, true);
-
-  // Send a packet while sending is blocked so it ends up buffered.
-  controller_->set_send_blocked(true);
-  EXPECT_TRUE(channel_->Send(packet));
-
-  // Tell the data channel that its transport is being destroyed.
-  // It should then stop using the transport (allowing us to delete it) and
-  // transition to the "closed" state.
-  RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, "");
-  error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
-  network_thread_.BlockingCall(
-      [&] { inner_channel_->OnTransportChannelClosed(error); });
-  controller_.reset(nullptr);
-  EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(),
-                 kDefaultTimeout);
-  EXPECT_FALSE(channel_->error().ok());
-  EXPECT_EQ(RTCErrorType::OPERATION_ERROR_WITH_DATA, channel_->error().type());
-  EXPECT_EQ(RTCErrorDetailType::SCTP_FAILURE, channel_->error().error_detail());
-}
-
 TEST_F(SctpDataChannelTest, TransportGotErrorCode) {
   SetChannelReady();
 
diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc
index e496382..6956cff 100644
--- a/pc/sctp_data_channel.cc
+++ b/pc/sctp_data_channel.cc
@@ -485,11 +485,10 @@
 
 uint64_t SctpDataChannel::buffered_amount() const {
   RTC_DCHECK_RUN_ON(network_thread_);
-  uint64_t buffered_amount = queued_send_data_.byte_count();
   if (controller_ != nullptr && id_n_.has_value()) {
-    buffered_amount += controller_->buffered_amount(*id_n_);
+    return controller_->buffered_amount(*id_n_);
   }
-  return buffered_amount;
+  return 0u;
 }
 
 void SctpDataChannel::Close() {
@@ -578,20 +577,14 @@
 
 // RTC_RUN_ON(network_thread_);
 RTCError SctpDataChannel::SendImpl(DataBuffer buffer) {
+  // The caller increases the cached `bufferedAmount` even if there are errors.
+  expected_buffer_amount_ += buffer.size();
+
   if (state_ != kOpen) {
     error_ = RTCError(RTCErrorType::INVALID_STATE);
     return error_;
   }
 
-  // If the queue is non-empty, we're waiting for SignalReadyToSend,
-  // so just add to the end of the queue and keep waiting.
-  if (!queued_send_data_.Empty()) {
-    error_ = QueueSendDataMessage(buffer)
-                 ? RTCError::OK()
-                 : RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
-    return error_;
-  }
-
   return SendDataMessage(buffer, true);
 }
 
@@ -629,8 +622,11 @@
     // Don't bother sending queued data since the side that initiated the
     // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
     // discussion about this.
-    queued_send_data_.Clear();
-    queued_control_data_.Clear();
+
+    // Note that this is handled by the SctpTransport, when an incoming stream
+    // reset notification comes in, the outgoing stream is closed, which
+    // discards data.
+
     // Just need to change state to kClosing, SctpTransport will handle the
     // rest of the closing procedure and OnClosingProcedureComplete will be
     // called later.
@@ -644,7 +640,9 @@
   // If the closing procedure is complete, we should have finished sending
   // all pending data and transitioned to kClosing already.
   RTC_DCHECK_EQ(state_, kClosing);
-  RTC_DCHECK(queued_send_data_.Empty());
+  if (controller_ && id_n_.has_value()) {
+    RTC_DCHECK_EQ(controller_->buffered_amount(*id_n_), 0);
+  }
   SetState(kClosed);
 }
 
@@ -664,6 +662,7 @@
 
 void SctpDataChannel::OnBufferedAmountLow() {
   RTC_DCHECK_RUN_ON(network_thread_);
+  MaybeSendOnBufferedAmountChanged();
 }
 
 DataChannelStats SctpDataChannel::GetStats() const {
@@ -739,9 +738,6 @@
   RTC_DCHECK(connected_to_transport());
   RTC_DCHECK(id_n_.has_value());
 
-  SendQueuedControlMessages();
-  SendQueuedDataMessages();
-
   UpdateState();
 }
 
@@ -754,10 +750,6 @@
 
   network_safety_->SetNotAlive();
 
-  // Closing abruptly means any queued data gets thrown away.
-  queued_send_data_.Clear();
-  queued_control_data_.Clear();
-
   // Still go to "kClosing" before "kClosed", since observers may be expecting
   // that.
   SetState(kClosing);
@@ -810,10 +802,10 @@
       break;
     }
     case kClosing: {
-      if (connected_to_transport() && controller_) {
+      if (connected_to_transport() && controller_ && id_n_.has_value()) {
         // Wait for all queued data to be sent before beginning the closing
         // procedure.
-        if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
+        if (controller_->buffered_amount(*id_n_) == 0) {
           // For SCTP data channels, we need to wait for the closing procedure
           // to complete; after calling RemoveSctpDataStream,
           // OnClosingProcedureComplete will end up called asynchronously
@@ -826,8 +818,6 @@
       } else {
         // When we're not connected to a transport, we'll transition
         // directly to the `kClosed` state from here.
-        queued_send_data_.Clear();
-        queued_control_data_.Clear();
         SetState(kClosed);
       }
       break;
@@ -866,22 +856,50 @@
   }
 }
 
-// RTC_RUN_ON(network_thread_).
-void SctpDataChannel::SendQueuedDataMessages() {
-  if (queued_send_data_.Empty()) {
+// RTC_RUN_ON(network_thread_)
+void SctpDataChannel::MaybeSendOnBufferedAmountChanged() {
+  // The `buffered_amount` in the signaling thread (RTCDataChannel in Blink)
+  // has a cached variant of the SCTP socket's buffered_amount, which it
+  // increases for every data sent and decreased when `OnBufferedAmountChange`
+  // is sent.
+  //
+  // To ensure it's consistent, this object maintains its own view of that value
+  // and if it changes with a reasonable amount (10kb, or down to zero), send
+  // the `OnBufferedAmountChange` to update the caller's cached variable.
+  if (!controller_ || !id_n_.has_value() || !observer_) {
     return;
   }
 
-  RTC_DCHECK(state_ == kOpen || state_ == kClosing);
-
-  while (!queued_send_data_.Empty()) {
-    std::unique_ptr<DataBuffer> buffer = queued_send_data_.PopFront();
-    if (!SendDataMessage(*buffer, false).ok()) {
-      // Return the message to the front of the queue if sending is aborted.
-      queued_send_data_.PushFront(std::move(buffer));
-      break;
-    }
+  // This becomes the resolution of how often the bufferedAmount is updated on
+  // the signaling thread and exists to avoid doing cross-thread communication
+  // too often. On benchmarks, Chrome handle around 300Mbps, which with this
+  // size results in a rate of ~400 updates per second - a reasonable number.
+  static constexpr int64_t kMinBufferedAmountDiffToTriggerCallback = 100 * 1024;
+  size_t actual_buffer_amount = controller_->buffered_amount(*id_n_);
+  if (actual_buffer_amount > expected_buffer_amount_) {
+    RTC_DLOG(LS_ERROR) << "Actual buffer_amount larger than expected";
+    return;
   }
+
+  // Fire OnBufferedAmountChange to decrease the cached view if it represents a
+  // big enough change (to reduce the frequency of cross-thread communication),
+  // or if it reaches zero.
+  if ((actual_buffer_amount == 0 && expected_buffer_amount_ != 0) ||
+      (expected_buffer_amount_ - actual_buffer_amount >
+       kMinBufferedAmountDiffToTriggerCallback)) {
+    uint64_t diff = expected_buffer_amount_ - actual_buffer_amount;
+    expected_buffer_amount_ = actual_buffer_amount;
+    observer_->OnBufferedAmountChange(diff);
+  }
+
+  // The threshold is always updated to ensure it's lower than what it's now.
+  // This ensures that this function will be called again, until the channel is
+  // completely drained.
+  controller_->SetBufferedAmountLowThreshold(
+      *id_n_,
+      actual_buffer_amount > kMinBufferedAmountDiffToTriggerCallback
+          ? actual_buffer_amount - kMinBufferedAmountDiffToTriggerCallback
+          : 0);
 }
 
 // RTC_RUN_ON(network_thread_).
@@ -908,25 +926,13 @@
       buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
 
   error_ = controller_->SendData(*id_n_, send_params, buffer.data);
+  MaybeSendOnBufferedAmountChanged();
   if (error_.ok()) {
     ++messages_sent_;
     bytes_sent_ += buffer.size();
-
-    if (observer_ && buffer.size() > 0) {
-      observer_->OnBufferedAmountChange(buffer.size());
-    }
     return error_;
   }
 
-  if (error_.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
-    if (!queue_if_blocked)
-      return error_;
-
-    if (QueueSendDataMessage(buffer)) {
-      error_ = RTCError::OK();
-      return error_;
-    }
-  }
   // Close the channel if the error is not SDR_BLOCK, or if queuing the
   // message failed.
   RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send data, "
@@ -939,30 +945,6 @@
 }
 
 // RTC_RUN_ON(network_thread_).
-bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
-  size_t start_buffered_amount = queued_send_data_.byte_count();
-  if (start_buffered_amount + buffer.size() >
-      DataChannelInterface::MaxSendQueueSize()) {
-    RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
-    error_ = RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
-    return false;
-  }
-  queued_send_data_.PushBack(std::make_unique<DataBuffer>(buffer));
-  return true;
-}
-
-// RTC_RUN_ON(network_thread_).
-void SctpDataChannel::SendQueuedControlMessages() {
-  PacketQueue control_packets;
-  control_packets.Swap(&queued_control_data_);
-
-  while (!control_packets.Empty()) {
-    std::unique_ptr<DataBuffer> buf = control_packets.PopFront();
-    SendControlMessage(buf->data);
-  }
-}
-
-// RTC_RUN_ON(network_thread_).
 bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
   RTC_DCHECK(connected_to_transport());
   RTC_DCHECK(id_n_.has_value());
@@ -988,8 +970,6 @@
     } else if (handshake_state_ == kHandshakeShouldSendOpen) {
       handshake_state_ = kHandshakeWaitingForAck;
     }
-  } else if (err.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
-    queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
   } else {
     RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
                          " the CONTROL message, send_result = "
diff --git a/pc/sctp_data_channel.h b/pc/sctp_data_channel.h
index fcd088c..e1bd461 100644
--- a/pc/sctp_data_channel.h
+++ b/pc/sctp_data_channel.h
@@ -257,19 +257,16 @@
 
   void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_);
 
-  void SendQueuedDataMessages() RTC_RUN_ON(network_thread_);
   RTCError SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked)
       RTC_RUN_ON(network_thread_);
-  bool QueueSendDataMessage(const DataBuffer& buffer)
-      RTC_RUN_ON(network_thread_);
 
-  void SendQueuedControlMessages() RTC_RUN_ON(network_thread_);
   bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer)
       RTC_RUN_ON(network_thread_);
 
   bool connected_to_transport() const RTC_RUN_ON(network_thread_) {
     return network_safety_->alive();
   }
+  void MaybeSendOnBufferedAmountChanged() RTC_RUN_ON(network_thread_);
 
   rtc::Thread* const signaling_thread_;
   rtc::Thread* const network_thread_;
@@ -283,6 +280,8 @@
   const absl::optional<Priority> priority_;
   const bool negotiated_;
   const bool ordered_;
+  // See the body of `MaybeSendOnBufferedAmountChanged`.
+  size_t expected_buffer_amount_ = 0;
 
   DataChannelObserver* observer_ RTC_GUARDED_BY(network_thread_) = nullptr;
   std::unique_ptr<ObserverAdapter> observer_adapter_;
@@ -298,11 +297,7 @@
       kHandshakeInit;
   // Did we already start the graceful SCTP closing procedure?
   bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false;
-  // Control messages that always have to get sent out before any queued
-  // data.
-  PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_);
   PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_);
-  PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_);
   rtc::scoped_refptr<PendingTaskSafetyFlag> network_safety_ =
       PendingTaskSafetyFlag::CreateDetachedInactive();
 };