Added explicit EOR to sctp messages and coalesce messages on the receiving side.

TBR=pthatcher@webrtc.org

Bug: webrtc:7774
Change-Id: I41d1cd98d1e7b2ad479177eb2e328a5e2c704824
Reviewed-on: https://webrtc-review.googlesource.com/88900
Commit-Queue: Jeroen de Borst <jeroendb@webrtc.org>
Reviewed-by: Qingsi Wang <qingsi@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24031}
diff --git a/media/sctp/sctptransport.cc b/media/sctp/sctptransport.cc
index 967e315..eaaf63f 100644
--- a/media/sctp/sctptransport.cc
+++ b/media/sctp/sctptransport.cc
@@ -291,22 +291,44 @@
       // It's neither a notification nor a recognized data packet.  Drop it.
       RTC_LOG(LS_ERROR) << "Received an unknown PPID " << ppid
                         << " on an SCTP packet.  Dropping.";
+      free(data);
     } else {
-      rtc::CopyOnWriteBuffer buffer;
       ReceiveDataParams params;
-      buffer.SetData(reinterpret_cast<uint8_t*>(data), length);
+
+      // Expect only continuation messages belonging to the same sid, usrsctp
+      // ensures this.
+      RTC_CHECK(transport->partial_message_.size() == 0 ||
+                rcv.rcv_sid == transport->partial_message_sid_);
+
+      transport->partial_message_.AppendData(reinterpret_cast<uint8_t*>(data),
+                                             length);
+      transport->partial_message_sid_ = rcv.rcv_sid;
+
+      free(data);
+
+      // Merge partial messages until they exceed the maximum send buffer size.
+      // This enables messages from a single send to be delivered in a single
+      // callback. Larger messages (originating from other implementations) will
+      // still be delivered in chunks.
+      if (!(flags & MSG_EOR) &&
+          (transport->partial_message_.size() < kSendBufferSize)) {
+        return 1;
+      }
+
       params.sid = rcv.rcv_sid;
       params.seq_num = rcv.rcv_ssn;
       params.timestamp = rcv.rcv_tsn;
       params.type = type;
+
       // The ownership of the packet transfers to |invoker_|. Using
       // CopyOnWriteBuffer is the most convenient way to do this.
       transport->invoker_.AsyncInvoke<void>(
           RTC_FROM_HERE, transport->network_thread_,
           rtc::Bind(&SctpTransport::OnInboundPacketFromSctpToTransport,
-                    transport, buffer, params, flags));
+                    transport, transport->partial_message_, params, flags));
+
+      transport->partial_message_.Clear();
     }
-    free(data);
     return 1;
   }
 
@@ -489,6 +511,7 @@
   spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
   spa.sendv_sndinfo.snd_sid = params.sid;
   spa.sendv_sndinfo.snd_ppid = rtc::HostToNetwork32(GetPpid(params.type));
+  spa.sendv_sndinfo.snd_flags |= SCTP_EOR;
 
   // Ordered implies reliable.
   if (!params.ordered) {
@@ -694,6 +717,15 @@
     return false;
   }
 
+  // Explicit EOR.
+  uint32_t eor = 1;
+  if (usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_EXPLICIT_EOR, &eor,
+                         sizeof(eor))) {
+    RTC_LOG_ERRNO(LS_ERROR) << debug_name_ << "->ConfigureSctpSocket(): "
+                            << "Failed to set SCTP_EXPLICIT_EOR.";
+    return false;
+  }
+
   // Subscribe to SCTP event notifications.
   int event_types[] = {SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE,
                        SCTP_SEND_FAILED_EVENT, SCTP_SENDER_DRY_EVENT,
diff --git a/media/sctp/sctptransport.h b/media/sctp/sctptransport.h
index b2d7084..aad060a 100644
--- a/media/sctp/sctptransport.h
+++ b/media/sctp/sctptransport.h
@@ -142,6 +142,12 @@
   rtc::AsyncInvoker invoker_;
   // Underlying DTLS channel.
   rtc::PacketTransportInternal* transport_ = nullptr;
+
+  // Track the data received from usrsctp between callbacks until the EOR bit
+  // arrives.
+  rtc::CopyOnWriteBuffer partial_message_;
+  int partial_message_sid_;
+
   bool was_ever_writable_ = false;
   int local_port_ = kSctpDefaultPort;
   int remote_port_ = kSctpDefaultPort;
diff --git a/pc/peerconnectionendtoend_unittest.cc b/pc/peerconnectionendtoend_unittest.cc
index 54d967f..7a8c2db 100644
--- a/pc/peerconnectionendtoend_unittest.cc
+++ b/pc/peerconnectionendtoend_unittest.cc
@@ -47,7 +47,7 @@
 
 namespace {
 
-const int kMaxWait = 10000;
+const int kMaxWait = 25000;
 
 }  // namespace
 
@@ -139,23 +139,40 @@
 
   // Tests that |dc1| and |dc2| can send to and receive from each other.
   void TestDataChannelSendAndReceive(DataChannelInterface* dc1,
-                                     DataChannelInterface* dc2) {
+                                     DataChannelInterface* dc2,
+                                     size_t size = 6) {
     std::unique_ptr<webrtc::MockDataChannelObserver> dc1_observer(
         new webrtc::MockDataChannelObserver(dc1));
 
     std::unique_ptr<webrtc::MockDataChannelObserver> dc2_observer(
         new webrtc::MockDataChannelObserver(dc2));
 
-    static const std::string kDummyData = "abcdefg";
-    webrtc::DataBuffer buffer(kDummyData);
+    static const std::string kDummyData =
+        "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+    webrtc::DataBuffer buffer("");
+
+    size_t sizeLeft = size;
+    while (sizeLeft > 0) {
+      size_t chunkSize =
+          sizeLeft > kDummyData.length() ? kDummyData.length() : sizeLeft;
+      buffer.data.AppendData(kDummyData.data(), chunkSize);
+      sizeLeft -= chunkSize;
+    }
+
     EXPECT_TRUE(dc1->Send(buffer));
-    EXPECT_EQ_WAIT(kDummyData, dc2_observer->last_message(), kMaxWait);
+    EXPECT_EQ_WAIT(buffer.data,
+                   rtc::CopyOnWriteBuffer(dc2_observer->last_message()),
+                   kMaxWait);
 
     EXPECT_TRUE(dc2->Send(buffer));
-    EXPECT_EQ_WAIT(kDummyData, dc1_observer->last_message(), kMaxWait);
+    EXPECT_EQ_WAIT(buffer.data,
+                   rtc::CopyOnWriteBuffer(dc1_observer->last_message()),
+                   kMaxWait);
 
     EXPECT_EQ(1U, dc1_observer->received_message_count());
+    EXPECT_EQ(size, dc1_observer->last_message().length());
     EXPECT_EQ(1U, dc2_observer->received_message_count());
+    EXPECT_EQ(size, dc2_observer->last_message().length());
   }
 
   void WaitForDataChannelsToOpen(DataChannelInterface* local_dc,
@@ -524,6 +541,40 @@
   CloseDataChannels(callee_dc, caller_signaled_data_channels_, 0);
 }
 
+// Verifies that a DataChannel created can transfer large messages.
+TEST_P(PeerConnectionEndToEndTest, CreateDataChannelLargeTransfer) {
+  CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(),
+            webrtc::MockAudioDecoderFactory::CreateEmptyFactory());
+
+  webrtc::DataChannelInit init;
+
+  // This DataChannel is for creating the data content in the negotiation.
+  rtc::scoped_refptr<DataChannelInterface> dummy(
+      caller_->CreateDataChannel("data", init));
+  Negotiate();
+  WaitForConnection();
+
+  // Wait for the data channel created pre-negotiation to be opened.
+  WaitForDataChannelsToOpen(dummy, callee_signaled_data_channels_, 0);
+
+  // Create new DataChannels after the negotiation and verify their states.
+  rtc::scoped_refptr<DataChannelInterface> caller_dc(
+      caller_->CreateDataChannel("hello", init));
+  rtc::scoped_refptr<DataChannelInterface> callee_dc(
+      callee_->CreateDataChannel("hello", init));
+
+  WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 1);
+  WaitForDataChannelsToOpen(callee_dc, caller_signaled_data_channels_, 0);
+
+  TestDataChannelSendAndReceive(caller_dc, callee_signaled_data_channels_[1],
+                                256 * 1024);
+  TestDataChannelSendAndReceive(callee_dc, caller_signaled_data_channels_[0],
+                                256 * 1024);
+
+  CloseDataChannels(caller_dc, callee_signaled_data_channels_, 1);
+  CloseDataChannels(callee_dc, caller_signaled_data_channels_, 0);
+}
+
 // Verifies that DataChannel IDs are even/odd based on the DTLS roles.
 TEST_P(PeerConnectionEndToEndTest, DataChannelIdAssignment) {
   CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(),
diff --git a/rtc_base/copyonwritebuffer.cc b/rtc_base/copyonwritebuffer.cc
index 8874ea9..6c48d52 100644
--- a/rtc_base/copyonwritebuffer.cc
+++ b/rtc_base/copyonwritebuffer.cc
@@ -22,6 +22,9 @@
 CopyOnWriteBuffer::CopyOnWriteBuffer(CopyOnWriteBuffer&& buf)
     : buffer_(std::move(buf.buffer_)) {}
 
+CopyOnWriteBuffer::CopyOnWriteBuffer(const std::string& s)
+    : CopyOnWriteBuffer(s.data(), s.length()) {}
+
 CopyOnWriteBuffer::CopyOnWriteBuffer(size_t size)
     : buffer_(size > 0 ? new RefCountedObject<Buffer>(size) : nullptr) {
   RTC_DCHECK(IsConsistent());
diff --git a/rtc_base/copyonwritebuffer.h b/rtc_base/copyonwritebuffer.h
index 467baad..0514e2f 100644
--- a/rtc_base/copyonwritebuffer.h
+++ b/rtc_base/copyonwritebuffer.h
@@ -31,6 +31,9 @@
   // Move contents from an existing buffer.
   CopyOnWriteBuffer(CopyOnWriteBuffer&& buf);
 
+  // Construct a buffer from a string, convenient for unittests.
+  CopyOnWriteBuffer(const std::string& s);
+
   // Construct a buffer with the specified number of uninitialized bytes.
   explicit CopyOnWriteBuffer(size_t size);
   CopyOnWriteBuffer(size_t size, size_t capacity);