Added explicit EOR to sctp messages and coalesce messages on the receiving side.
TBR=pthatcher@webrtc.org
Bug: webrtc:7774
Change-Id: I41d1cd98d1e7b2ad479177eb2e328a5e2c704824
Reviewed-on: https://webrtc-review.googlesource.com/88900
Commit-Queue: Jeroen de Borst <jeroendb@webrtc.org>
Reviewed-by: Qingsi Wang <qingsi@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24031}
diff --git a/media/sctp/sctptransport.cc b/media/sctp/sctptransport.cc
index 967e315..eaaf63f 100644
--- a/media/sctp/sctptransport.cc
+++ b/media/sctp/sctptransport.cc
@@ -291,22 +291,44 @@
// It's neither a notification nor a recognized data packet. Drop it.
RTC_LOG(LS_ERROR) << "Received an unknown PPID " << ppid
<< " on an SCTP packet. Dropping.";
+ free(data);
} else {
- rtc::CopyOnWriteBuffer buffer;
ReceiveDataParams params;
- buffer.SetData(reinterpret_cast<uint8_t*>(data), length);
+
+ // Expect only continuation messages belonging to the same sid, usrsctp
+ // ensures this.
+ RTC_CHECK(transport->partial_message_.size() == 0 ||
+ rcv.rcv_sid == transport->partial_message_sid_);
+
+ transport->partial_message_.AppendData(reinterpret_cast<uint8_t*>(data),
+ length);
+ transport->partial_message_sid_ = rcv.rcv_sid;
+
+ free(data);
+
+ // Merge partial messages until they exceed the maximum send buffer size.
+ // This enables messages from a single send to be delivered in a single
+ // callback. Larger messages (originating from other implementations) will
+ // still be delivered in chunks.
+ if (!(flags & MSG_EOR) &&
+ (transport->partial_message_.size() < kSendBufferSize)) {
+ return 1;
+ }
+
params.sid = rcv.rcv_sid;
params.seq_num = rcv.rcv_ssn;
params.timestamp = rcv.rcv_tsn;
params.type = type;
+
// The ownership of the packet transfers to |invoker_|. Using
// CopyOnWriteBuffer is the most convenient way to do this.
transport->invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, transport->network_thread_,
rtc::Bind(&SctpTransport::OnInboundPacketFromSctpToTransport,
- transport, buffer, params, flags));
+ transport, transport->partial_message_, params, flags));
+
+ transport->partial_message_.Clear();
}
- free(data);
return 1;
}
@@ -489,6 +511,7 @@
spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
spa.sendv_sndinfo.snd_sid = params.sid;
spa.sendv_sndinfo.snd_ppid = rtc::HostToNetwork32(GetPpid(params.type));
+ spa.sendv_sndinfo.snd_flags |= SCTP_EOR;
// Ordered implies reliable.
if (!params.ordered) {
@@ -694,6 +717,15 @@
return false;
}
+ // Explicit EOR.
+ uint32_t eor = 1;
+ if (usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_EXPLICIT_EOR, &eor,
+ sizeof(eor))) {
+ RTC_LOG_ERRNO(LS_ERROR) << debug_name_ << "->ConfigureSctpSocket(): "
+ << "Failed to set SCTP_EXPLICIT_EOR.";
+ return false;
+ }
+
// Subscribe to SCTP event notifications.
int event_types[] = {SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE,
SCTP_SEND_FAILED_EVENT, SCTP_SENDER_DRY_EVENT,
diff --git a/media/sctp/sctptransport.h b/media/sctp/sctptransport.h
index b2d7084..aad060a 100644
--- a/media/sctp/sctptransport.h
+++ b/media/sctp/sctptransport.h
@@ -142,6 +142,12 @@
rtc::AsyncInvoker invoker_;
// Underlying DTLS channel.
rtc::PacketTransportInternal* transport_ = nullptr;
+
+ // Track the data received from usrsctp between callbacks until the EOR bit
+ // arrives.
+ rtc::CopyOnWriteBuffer partial_message_;
+ int partial_message_sid_;
+
bool was_ever_writable_ = false;
int local_port_ = kSctpDefaultPort;
int remote_port_ = kSctpDefaultPort;
diff --git a/pc/peerconnectionendtoend_unittest.cc b/pc/peerconnectionendtoend_unittest.cc
index 54d967f..7a8c2db 100644
--- a/pc/peerconnectionendtoend_unittest.cc
+++ b/pc/peerconnectionendtoend_unittest.cc
@@ -47,7 +47,7 @@
namespace {
-const int kMaxWait = 10000;
+const int kMaxWait = 25000;
} // namespace
@@ -139,23 +139,40 @@
// Tests that |dc1| and |dc2| can send to and receive from each other.
void TestDataChannelSendAndReceive(DataChannelInterface* dc1,
- DataChannelInterface* dc2) {
+ DataChannelInterface* dc2,
+ size_t size = 6) {
std::unique_ptr<webrtc::MockDataChannelObserver> dc1_observer(
new webrtc::MockDataChannelObserver(dc1));
std::unique_ptr<webrtc::MockDataChannelObserver> dc2_observer(
new webrtc::MockDataChannelObserver(dc2));
- static const std::string kDummyData = "abcdefg";
- webrtc::DataBuffer buffer(kDummyData);
+ static const std::string kDummyData =
+ "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
+ webrtc::DataBuffer buffer("");
+
+ size_t sizeLeft = size;
+ while (sizeLeft > 0) {
+ size_t chunkSize =
+ sizeLeft > kDummyData.length() ? kDummyData.length() : sizeLeft;
+ buffer.data.AppendData(kDummyData.data(), chunkSize);
+ sizeLeft -= chunkSize;
+ }
+
EXPECT_TRUE(dc1->Send(buffer));
- EXPECT_EQ_WAIT(kDummyData, dc2_observer->last_message(), kMaxWait);
+ EXPECT_EQ_WAIT(buffer.data,
+ rtc::CopyOnWriteBuffer(dc2_observer->last_message()),
+ kMaxWait);
EXPECT_TRUE(dc2->Send(buffer));
- EXPECT_EQ_WAIT(kDummyData, dc1_observer->last_message(), kMaxWait);
+ EXPECT_EQ_WAIT(buffer.data,
+ rtc::CopyOnWriteBuffer(dc1_observer->last_message()),
+ kMaxWait);
EXPECT_EQ(1U, dc1_observer->received_message_count());
+ EXPECT_EQ(size, dc1_observer->last_message().length());
EXPECT_EQ(1U, dc2_observer->received_message_count());
+ EXPECT_EQ(size, dc2_observer->last_message().length());
}
void WaitForDataChannelsToOpen(DataChannelInterface* local_dc,
@@ -524,6 +541,40 @@
CloseDataChannels(callee_dc, caller_signaled_data_channels_, 0);
}
+// Verifies that a DataChannel created can transfer large messages.
+TEST_P(PeerConnectionEndToEndTest, CreateDataChannelLargeTransfer) {
+ CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(),
+ webrtc::MockAudioDecoderFactory::CreateEmptyFactory());
+
+ webrtc::DataChannelInit init;
+
+ // This DataChannel is for creating the data content in the negotiation.
+ rtc::scoped_refptr<DataChannelInterface> dummy(
+ caller_->CreateDataChannel("data", init));
+ Negotiate();
+ WaitForConnection();
+
+ // Wait for the data channel created pre-negotiation to be opened.
+ WaitForDataChannelsToOpen(dummy, callee_signaled_data_channels_, 0);
+
+ // Create new DataChannels after the negotiation and verify their states.
+ rtc::scoped_refptr<DataChannelInterface> caller_dc(
+ caller_->CreateDataChannel("hello", init));
+ rtc::scoped_refptr<DataChannelInterface> callee_dc(
+ callee_->CreateDataChannel("hello", init));
+
+ WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 1);
+ WaitForDataChannelsToOpen(callee_dc, caller_signaled_data_channels_, 0);
+
+ TestDataChannelSendAndReceive(caller_dc, callee_signaled_data_channels_[1],
+ 256 * 1024);
+ TestDataChannelSendAndReceive(callee_dc, caller_signaled_data_channels_[0],
+ 256 * 1024);
+
+ CloseDataChannels(caller_dc, callee_signaled_data_channels_, 1);
+ CloseDataChannels(callee_dc, caller_signaled_data_channels_, 0);
+}
+
// Verifies that DataChannel IDs are even/odd based on the DTLS roles.
TEST_P(PeerConnectionEndToEndTest, DataChannelIdAssignment) {
CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(),
diff --git a/rtc_base/copyonwritebuffer.cc b/rtc_base/copyonwritebuffer.cc
index 8874ea9..6c48d52 100644
--- a/rtc_base/copyonwritebuffer.cc
+++ b/rtc_base/copyonwritebuffer.cc
@@ -22,6 +22,9 @@
CopyOnWriteBuffer::CopyOnWriteBuffer(CopyOnWriteBuffer&& buf)
: buffer_(std::move(buf.buffer_)) {}
+CopyOnWriteBuffer::CopyOnWriteBuffer(const std::string& s)
+ : CopyOnWriteBuffer(s.data(), s.length()) {}
+
CopyOnWriteBuffer::CopyOnWriteBuffer(size_t size)
: buffer_(size > 0 ? new RefCountedObject<Buffer>(size) : nullptr) {
RTC_DCHECK(IsConsistent());
diff --git a/rtc_base/copyonwritebuffer.h b/rtc_base/copyonwritebuffer.h
index 467baad..0514e2f 100644
--- a/rtc_base/copyonwritebuffer.h
+++ b/rtc_base/copyonwritebuffer.h
@@ -31,6 +31,9 @@
// Move contents from an existing buffer.
CopyOnWriteBuffer(CopyOnWriteBuffer&& buf);
+ // Construct a buffer from a string, convenient for unittests.
+ CopyOnWriteBuffer(const std::string& s);
+
// Construct a buffer with the specified number of uninitialized bytes.
explicit CopyOnWriteBuffer(size_t size);
CopyOnWriteBuffer(size_t size, size_t capacity);