dcsctp: Use rtc::CopyOnWriteBuffer
This avoids copying the payload at all. Future CL will change the
transport.
In performance tests, memcpy was visible in the performance profiles
prior to this change.
Bug: webrtc:12943
Change-Id: I507a1a316165db748e73cf0d58c1be62cc76a2d2
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/236346
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35428}
diff --git a/net/dcsctp/fuzzers/dcsctp_fuzzers.cc b/net/dcsctp/fuzzers/dcsctp_fuzzers.cc
index b4b6224..dbb9f7b 100644
--- a/net/dcsctp/fuzzers/dcsctp_fuzzers.cc
+++ b/net/dcsctp/fuzzers/dcsctp_fuzzers.cc
@@ -29,6 +29,7 @@
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/socket/dcsctp_socket.h"
#include "net/dcsctp/socket/state_cookie.h"
+#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/logging.h"
namespace dcsctp {
@@ -167,9 +168,9 @@
options.is_unordered = IsUnordered(state.GetByte() != 0);
options.is_beginning = Data::IsBeginning(state.GetByte() != 0);
options.is_end = Data::IsEnd(state.GetByte() != 0);
+ rtc::CopyOnWriteBuffer payload(10);
b.Add(DataChunk(state.GetNextTSN(), StreamID(state.GetByte()),
- SSN(state.GetByte()), PPID(53), std::vector<uint8_t>(10),
- options));
+ SSN(state.GetByte()), PPID(53), payload, options));
}
void MakeInitChunk(FuzzState& state, SctpPacket::Builder& b) {
@@ -284,7 +285,7 @@
options.is_end = Data::IsEnd(state.GetByte() != 0);
b.Add(IDataChunk(state.GetNextTSN(), StreamID(state.GetByte()),
state.GetNextMID(), PPID(53), FSN(0),
- std::vector<uint8_t>(10), options));
+ rtc::CopyOnWriteBuffer(10), options));
}
void MakeIForwardTsnChunk(FuzzState& state, SctpPacket::Builder& b) {
diff --git a/net/dcsctp/packet/chunk/data_chunk.cc b/net/dcsctp/packet/chunk/data_chunk.cc
index 769be2d..3ee9cfa 100644
--- a/net/dcsctp/packet/chunk/data_chunk.cc
+++ b/net/dcsctp/packet/chunk/data_chunk.cc
@@ -20,6 +20,8 @@
#include "net/dcsctp/packet/bounded_byte_reader.h"
#include "net/dcsctp/packet/bounded_byte_writer.h"
#include "net/dcsctp/packet/chunk/data_common.h"
+#include "rtc_base/copy_on_write_buffer.h"
+#include "rtc_base/logging.h"
#include "rtc_base/strings/string_builder.h"
namespace dcsctp {
@@ -64,9 +66,7 @@
ImmediateAckFlag((flags & (1 << kFlagsBitImmediateAck)) != 0);
return DataChunk(tsn, stream_identifier, ssn, ppid,
- std::vector<uint8_t>(reader->variable_data().begin(),
- reader->variable_data().end()),
- options);
+ rtc::CopyOnWriteBuffer(reader->variable_data()), options);
}
void DataChunk::SerializeTo(std::vector<uint8_t>& out) const {
diff --git a/net/dcsctp/packet/chunk/data_chunk.h b/net/dcsctp/packet/chunk/data_chunk.h
index 12bb05f..22fb7b4 100644
--- a/net/dcsctp/packet/chunk/data_chunk.h
+++ b/net/dcsctp/packet/chunk/data_chunk.h
@@ -23,6 +23,7 @@
#include "net/dcsctp/packet/chunk/data_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/tlv_trait.h"
+#include "rtc_base/copy_on_write_buffer.h"
namespace dcsctp {
@@ -45,7 +46,7 @@
StreamID stream_id,
SSN ssn,
PPID ppid,
- std::vector<uint8_t> payload,
+ rtc::CopyOnWriteBuffer payload,
const Options& options)
: AnyDataChunk(tsn,
stream_id,
diff --git a/net/dcsctp/packet/chunk/data_chunk_test.cc b/net/dcsctp/packet/chunk/data_chunk_test.cc
index def99ceb..1d753a0 100644
--- a/net/dcsctp/packet/chunk/data_chunk_test.cc
+++ b/net/dcsctp/packet/chunk/data_chunk_test.cc
@@ -15,6 +15,7 @@
#include "api/array_view.h"
#include "net/dcsctp/testing/testing_macros.h"
+#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/gunit.h"
#include "test/gmock.h"
@@ -51,8 +52,9 @@
}
TEST(DataChunkTest, SerializeAndDeserialize) {
+ rtc::CopyOnWriteBuffer payload({1, 2, 3, 4, 5});
DataChunk chunk(TSN(123), StreamID(456), SSN(789), PPID(9090),
- /*payload=*/{1, 2, 3, 4, 5},
+ std::move(payload),
/*options=*/{});
std::vector<uint8_t> serialized;
diff --git a/net/dcsctp/packet/chunk/data_common.h b/net/dcsctp/packet/chunk/data_common.h
index b67efee..ef2696a 100644
--- a/net/dcsctp/packet/chunk/data_common.h
+++ b/net/dcsctp/packet/chunk/data_common.h
@@ -17,6 +17,7 @@
#include "api/array_view.h"
#include "net/dcsctp/packet/chunk/chunk.h"
#include "net/dcsctp/packet/data.h"
+#include "rtc_base/copy_on_write_buffer.h"
namespace dcsctp {
@@ -62,7 +63,7 @@
MID message_id,
FSN fsn,
PPID ppid,
- std::vector<uint8_t> payload,
+ rtc::CopyOnWriteBuffer payload,
const Options& options)
: tsn_(tsn),
data_(stream_id,
diff --git a/net/dcsctp/packet/chunk/idata_chunk.cc b/net/dcsctp/packet/chunk/idata_chunk.cc
index 378c527..a48f600 100644
--- a/net/dcsctp/packet/chunk/idata_chunk.cc
+++ b/net/dcsctp/packet/chunk/idata_chunk.cc
@@ -68,9 +68,7 @@
return IDataChunk(tsn, stream_identifier, message_id,
PPID(options.is_beginning ? ppid_or_fsn : 0),
FSN(options.is_beginning ? 0 : ppid_or_fsn),
- std::vector<uint8_t>(reader->variable_data().begin(),
- reader->variable_data().end()),
- options);
+ rtc::CopyOnWriteBuffer(reader->variable_data()), options);
}
void IDataChunk::SerializeTo(std::vector<uint8_t>& out) const {
diff --git a/net/dcsctp/packet/chunk/idata_chunk.h b/net/dcsctp/packet/chunk/idata_chunk.h
index 8cdf2a1..af394c3 100644
--- a/net/dcsctp/packet/chunk/idata_chunk.h
+++ b/net/dcsctp/packet/chunk/idata_chunk.h
@@ -23,6 +23,7 @@
#include "net/dcsctp/packet/chunk/data_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/tlv_trait.h"
+#include "rtc_base/copy_on_write_buffer.h"
namespace dcsctp {
@@ -45,7 +46,7 @@
MID message_id,
PPID ppid,
FSN fsn,
- std::vector<uint8_t> payload,
+ rtc::CopyOnWriteBuffer payload,
const Options& options)
: AnyDataChunk(tsn,
stream_id,
diff --git a/net/dcsctp/packet/chunk/idata_chunk_test.cc b/net/dcsctp/packet/chunk/idata_chunk_test.cc
index fea492d..e5d4b8e 100644
--- a/net/dcsctp/packet/chunk/idata_chunk_test.cc
+++ b/net/dcsctp/packet/chunk/idata_chunk_test.cc
@@ -52,8 +52,8 @@
TEST(IDataChunkTest, AtBeginningSerializeAndDeserialize) {
IDataChunk::Options options;
options.is_beginning = Data::IsBeginning(true);
- IDataChunk chunk(TSN(123), StreamID(456), MID(789), PPID(53), FSN(0), {1},
- options);
+ IDataChunk chunk(TSN(123), StreamID(456), MID(789), PPID(53), FSN(0),
+ rtc::CopyOnWriteBuffer({1, 2, 4}), options);
std::vector<uint8_t> serialized;
chunk.SerializeTo(serialized);
@@ -68,7 +68,7 @@
EXPECT_EQ(deserialized.ToString(),
"I-DATA, type=ordered::first, tsn=123, stream_id=456, "
- "message_id=789, ppid=53, length=1");
+ "message_id=789, ppid=53, length=3");
}
TEST(IDataChunkTest, InMiddleFromCapture) {
@@ -100,7 +100,7 @@
TEST(IDataChunkTest, InMiddleSerializeAndDeserialize) {
IDataChunk chunk(TSN(123), StreamID(456), MID(789), PPID(0), FSN(101112),
- {1, 2, 3}, /*options=*/{});
+ rtc::CopyOnWriteBuffer({1, 2, 3}), /*options=*/{});
std::vector<uint8_t> serialized;
chunk.SerializeTo(serialized);
diff --git a/net/dcsctp/packet/data.h b/net/dcsctp/packet/data.h
index c1754ed..b148f1d 100644
--- a/net/dcsctp/packet/data.h
+++ b/net/dcsctp/packet/data.h
@@ -16,6 +16,7 @@
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/public/types.h"
+#include "rtc_base/copy_on_write_buffer.h"
namespace dcsctp {
@@ -45,7 +46,7 @@
MID message_id,
FSN fsn,
PPID ppid,
- std::vector<uint8_t> payload,
+ rtc::CopyOnWriteBuffer payload,
IsBeginning is_beginning,
IsEnd is_end,
IsUnordered is_unordered)
@@ -90,7 +91,7 @@
PPID ppid;
// The actual data payload.
- std::vector<uint8_t> payload;
+ rtc::CopyOnWriteBuffer payload;
// If this data represents the first, last or a middle chunk.
IsBeginning is_beginning;
diff --git a/net/dcsctp/packet/sctp_packet_test.cc b/net/dcsctp/packet/sctp_packet_test.cc
index 7438315..22f3db3 100644
--- a/net/dcsctp/packet/sctp_packet_test.cc
+++ b/net/dcsctp/packet/sctp_packet_test.cc
@@ -242,10 +242,10 @@
{SackChunk::GapAckBlock(2, 3)},
/*duplicate_tsns=*/{TSN(1), TSN(2), TSN(3)}));
b.Add(DataChunk(TSN(123), StreamID(456), SSN(789), PPID(9090),
- /*payload=*/{1, 2, 3, 4, 5},
+ /*payload=*/rtc::CopyOnWriteBuffer({1, 2, 3, 4, 5}),
/*options=*/{}));
b.Add(DataChunk(TSN(124), StreamID(654), SSN(987), PPID(909),
- /*payload=*/{5, 4, 3, 3, 1},
+ /*payload=*/rtc::CopyOnWriteBuffer({5, 4, 3, 3, 1}),
/*options=*/{}));
std::vector<uint8_t> serialized = b.Build();
@@ -319,7 +319,7 @@
// Add a smaller packet first.
DataChunk::Options data_options;
- std::vector<uint8_t> payload1(183);
+ rtc::CopyOnWriteBuffer payload1(183);
builder.Add(
DataChunk(TSN(1), StreamID(1), SSN(0), PPID(53), payload1, data_options));
@@ -328,7 +328,7 @@
kMaxPacketSize - kSctpHeaderSize - chunk1_size);
EXPECT_EQ(builder.bytes_remaining(), 976u); // Hand-calculated.
- std::vector<uint8_t> payload2(957);
+ rtc::CopyOnWriteBuffer payload2(957);
builder.Add(
DataChunk(TSN(1), StreamID(1), SSN(0), PPID(53), payload2, data_options));
diff --git a/net/dcsctp/public/dcsctp_message.h b/net/dcsctp/public/dcsctp_message.h
index 38e6763..9477133 100644
--- a/net/dcsctp/public/dcsctp_message.h
+++ b/net/dcsctp/public/dcsctp_message.h
@@ -16,6 +16,7 @@
#include "api/array_view.h"
#include "net/dcsctp/public/types.h"
+#include "rtc_base/copy_on_write_buffer.h"
namespace dcsctp {
@@ -24,7 +25,14 @@
// identifier (`ppid`).
class DcSctpMessage {
public:
+ // For best performance, please use the other constructor for zero-copy.
DcSctpMessage(StreamID stream_id, PPID ppid, std::vector<uint8_t> payload)
+ : stream_id_(stream_id), ppid_(ppid), payload_(payload) {}
+
+ explicit DcSctpMessage(StreamID stream_id,
+ PPID ppid,
+ rtc::CopyOnWriteBuffer payload,
+ bool)
: stream_id_(stream_id), ppid_(ppid), payload_(std::move(payload)) {}
DcSctpMessage(DcSctpMessage&& other) = default;
@@ -40,14 +48,25 @@
// The payload of the message.
rtc::ArrayView<const uint8_t> payload() const { return payload_; }
+ const rtc::CopyOnWriteBuffer& buffer_payload() const { return payload_; }
// When destructing the message, extracts the payload.
- std::vector<uint8_t> ReleasePayload() && { return std::move(payload_); }
+ // Deprecated method - please use `ReleaseBufferPayload`.
+ ABSL_DEPRECATED("Use ReleaseBufferPayload instead")
+ std::vector<uint8_t> ReleasePayload() && {
+ return std::vector<uint8_t>(payload_.cdata(),
+ payload_.cdata() + payload_.size());
+ }
+
+ // When destructing the message, extracts the payload.
+ rtc::CopyOnWriteBuffer ReleaseBufferPayload() && {
+ return std::move(payload_);
+ }
private:
StreamID stream_id_;
PPID ppid_;
- std::vector<uint8_t> payload_;
+ rtc::CopyOnWriteBuffer payload_;
};
} // namespace dcsctp
diff --git a/net/dcsctp/rx/reassembly_queue_test.cc b/net/dcsctp/rx/reassembly_queue_test.cc
index d1e3bf6..8ba01a8 100644
--- a/net/dcsctp/rx/reassembly_queue_test.cc
+++ b/net/dcsctp/rx/reassembly_queue_test.cc
@@ -99,10 +99,9 @@
Data::IsBeginning is_beginning(tsns[i] == 10);
Data::IsEnd is_end(tsns[i] == 13);
- reasm.Add(TSN(tsns[i]),
- Data(kStreamID, kSSN, kMID, kFSN, kPPID,
- std::vector<uint8_t>(span.begin(), span.end()),
- is_beginning, is_end, IsUnordered(false)));
+ reasm.Add(TSN(tsns[i]), Data(kStreamID, kSSN, kMID, kFSN, kPPID,
+ rtc::CopyOnWriteBuffer(span), is_beginning,
+ is_end, IsUnordered(false)));
if (i < 3) {
EXPECT_FALSE(reasm.HasMessages());
} else {
@@ -135,10 +134,9 @@
Data::IsEnd is_end(true);
SSN ssn(static_cast<uint16_t>(tsns[i] - 10));
- reasm.Add(TSN(tsns[i]),
- Data(kStreamID, ssn, kMID, kFSN, kPPID,
- std::vector<uint8_t>(span.begin(), span.end()),
- is_beginning, is_end, IsUnordered(false)));
+ reasm.Add(TSN(tsns[i]), Data(kStreamID, ssn, kMID, kFSN, kPPID,
+ rtc::CopyOnWriteBuffer(span), is_beginning,
+ is_end, IsUnordered(false)));
}
EXPECT_THAT(
reasm.FlushMessages(),
diff --git a/net/dcsctp/rx/traditional_reassembly_streams.cc b/net/dcsctp/rx/traditional_reassembly_streams.cc
index d004824..f745e463 100644
--- a/net/dcsctp/rx/traditional_reassembly_streams.cc
+++ b/net/dcsctp/rx/traditional_reassembly_streams.cc
@@ -145,29 +145,32 @@
const Data& data = start->second;
size_t payload_size = start->second.size();
UnwrappedTSN tsns[1] = {start->first};
- DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
+ DcSctpMessage message(data.stream_id, data.ppid, data.payload, false);
parent_.on_assembled_message_(tsns, std::move(message));
return payload_size;
}
// Slow path - will need to concatenate the payload.
std::vector<UnwrappedTSN> tsns;
- std::vector<uint8_t> payload;
size_t payload_size = std::accumulate(
start, end, 0,
[](size_t v, const auto& p) { return v + p.second.size(); });
tsns.reserve(count);
- payload.reserve(payload_size);
+ rtc::CopyOnWriteBuffer payload(payload_size);
+
+ size_t offset = 0;
for (auto it = start; it != end; ++it) {
const Data& data = it->second;
tsns.push_back(it->first);
- payload.insert(payload.end(), data.payload.begin(), data.payload.end());
+ memcpy(reinterpret_cast<void*>(payload.MutableData() + offset),
+ data.payload.cdata(), data.payload.size());
+ offset += data.payload.size();
}
DcSctpMessage message(start->second.stream_id, start->second.ppid,
- std::move(payload));
+ std::move(payload), false);
parent_.on_assembled_message_(tsns, std::move(message));
return payload_size;
diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc
index 8153910..554fab9 100644
--- a/net/dcsctp/socket/dcsctp_socket.cc
+++ b/net/dcsctp/socket/dcsctp_socket.cc
@@ -977,7 +977,7 @@
AnyDataChunk::ImmediateAckFlag immediate_ack = chunk.options().immediate_ack;
Data data = std::move(chunk).extract();
- if (data.payload.empty()) {
+ if (data.payload.size() == 0) {
// Empty DATA chunks are illegal.
packet_sender_.Send(tcb_->PacketBuilder().Add(
ErrorChunk(Parameters::Builder().Add(NoUserDataCause(tsn)).Build())));
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc
index 66876e4..cbe530d 100644
--- a/net/dcsctp/socket/dcsctp_socket_test.cc
+++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -1396,8 +1396,10 @@
opts.is_beginning = Data::IsBeginning(true);
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
- .Add(DataChunk(tsn, StreamID(1), SSN(0), PPID(53),
- std::vector<uint8_t>(kWatermarkLimit + 1), opts))
+ .Add(DataChunk(
+ tsn, StreamID(1), SSN(0), PPID(53),
+ rtc::CopyOnWriteBuffer(std::vector<uint8_t>(kWatermarkLimit + 1)),
+ opts))
.Build());
// First DATA will always trigger a SACK. It's not interesting.
@@ -1405,11 +1407,12 @@
AllOf(HasSackWithCumAckTsn(tsn), HasSackWithNoGapAckBlocks()));
// This DATA should be accepted - it's advancing cum ack tsn.
- sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options)
- .Add(DataChunk(AddTo(tsn, 1), StreamID(1), SSN(0),
- PPID(53), std::vector<uint8_t>(1),
- /*options=*/{}))
- .Build());
+ sock_z2.ReceivePacket(
+ SctpPacket::Builder(sock_z2.verification_tag(), options)
+ .Add(DataChunk(AddTo(tsn, 1), StreamID(1), SSN(0), PPID(53),
+ rtc::CopyOnWriteBuffer(std::vector<uint8_t>(1)),
+ /*options=*/{}))
+ .Build());
// The receiver might have moved into delayed ack mode.
cb_z2.AdvanceTime(options.rto_initial);
@@ -1420,11 +1423,12 @@
AllOf(HasSackWithCumAckTsn(AddTo(tsn, 1)), HasSackWithNoGapAckBlocks()));
// This DATA will not be accepted - it's not advancing cum ack tsn.
- sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options)
- .Add(DataChunk(AddTo(tsn, 3), StreamID(1), SSN(0),
- PPID(53), std::vector<uint8_t>(1),
- /*options=*/{}))
- .Build());
+ sock_z2.ReceivePacket(
+ SctpPacket::Builder(sock_z2.verification_tag(), options)
+ .Add(DataChunk(AddTo(tsn, 3), StreamID(1), SSN(0), PPID(53),
+ rtc::CopyOnWriteBuffer(std::vector<uint8_t>(1)),
+ /*options=*/{}))
+ .Build());
// Sack will be sent in IMMEDIATE mode when this is happening.
EXPECT_THAT(
@@ -1432,11 +1436,12 @@
AllOf(HasSackWithCumAckTsn(AddTo(tsn, 1)), HasSackWithNoGapAckBlocks()));
// This DATA will not be accepted either.
- sock_z2.ReceivePacket(SctpPacket::Builder(sock_z2.verification_tag(), options)
- .Add(DataChunk(AddTo(tsn, 4), StreamID(1), SSN(0),
- PPID(53), std::vector<uint8_t>(1),
- /*options=*/{}))
- .Build());
+ sock_z2.ReceivePacket(
+ SctpPacket::Builder(sock_z2.verification_tag(), options)
+ .Add(DataChunk(AddTo(tsn, 4), StreamID(1), SSN(0), PPID(53),
+ rtc::CopyOnWriteBuffer(std::vector<uint8_t>(1)),
+ /*options=*/{}))
+ .Build());
// Sack will be sent in IMMEDIATE mode when this is happening.
EXPECT_THAT(
@@ -1446,9 +1451,10 @@
// This DATA should be accepted, and it fills the reassembly queue.
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
- .Add(DataChunk(AddTo(tsn, 2), StreamID(1), SSN(0), PPID(53),
- std::vector<uint8_t>(kRemainingSize),
- /*options=*/{}))
+ .Add(DataChunk(
+ AddTo(tsn, 2), StreamID(1), SSN(0), PPID(53),
+ rtc::CopyOnWriteBuffer(std::vector<uint8_t>(kRemainingSize)),
+ /*options=*/{}))
.Build());
// The receiver might have moved into delayed ack mode.
@@ -1465,9 +1471,10 @@
// This DATA will make the connection close. It's too full now.
sock_z2.ReceivePacket(
SctpPacket::Builder(sock_z2.verification_tag(), options)
- .Add(DataChunk(AddTo(tsn, 3), StreamID(1), SSN(0), PPID(53),
- std::vector<uint8_t>(kSmallMessageSize),
- /*options=*/{}))
+ .Add(DataChunk(
+ AddTo(tsn, 3), StreamID(1), SSN(0), PPID(53),
+ rtc::CopyOnWriteBuffer(std::vector<uint8_t>(kSmallMessageSize)),
+ /*options=*/{}))
.Build());
}
diff --git a/net/dcsctp/testing/data_generator.cc b/net/dcsctp/testing/data_generator.cc
index e4f9f91..647c121 100644
--- a/net/dcsctp/testing/data_generator.cc
+++ b/net/dcsctp/testing/data_generator.cc
@@ -34,8 +34,8 @@
}
MID message_id = opts.message_id.value_or(message_id_);
Data ret = Data(opts.stream_id, SSN(static_cast<uint16_t>(*message_id)),
- message_id, fsn_, opts.ppid, std::move(payload), is_beginning,
- is_end, IsUnordered(false));
+ message_id, fsn_, opts.ppid, rtc::CopyOnWriteBuffer(payload),
+ is_beginning, is_end, IsUnordered(false));
if (is_end) {
message_id_ = MID(*message_id + 1);
@@ -56,7 +56,8 @@
}
MID message_id = opts.message_id.value_or(message_id_);
Data ret = Data(opts.stream_id, SSN(0), message_id, fsn_, kPpid,
- std::move(payload), is_beginning, is_end, IsUnordered(true));
+ rtc::CopyOnWriteBuffer(payload), is_beginning, is_end,
+ IsUnordered(true));
if (is_end) {
message_id_ = MID(*message_id + 1);
}
diff --git a/net/dcsctp/tx/outstanding_data.cc b/net/dcsctp/tx/outstanding_data.cc
index dc998de..abb5dc1 100644
--- a/net/dcsctp/tx/outstanding_data.cc
+++ b/net/dcsctp/tx/outstanding_data.cc
@@ -248,7 +248,7 @@
next_tsn_.Increment();
Data message_end(item.data().stream_id, item.data().ssn,
item.data().message_id, item.data().fsn, item.data().ppid,
- std::vector<uint8_t>(), Data::IsBeginning(false),
+ rtc::CopyOnWriteBuffer(), Data::IsBeginning(false),
Data::IsEnd(true), item.data().is_unordered);
Item& added_item =
outstanding_data_
@@ -357,10 +357,10 @@
size_t chunk_size = GetSerializedChunkSize(data);
outstanding_bytes_ += chunk_size;
++outstanding_items_;
- auto it = outstanding_data_
- .emplace(tsn, Item(data.Clone(), max_retransmissions, time_sent,
- expires_at))
- .first;
+ auto it =
+ outstanding_data_
+ .emplace(tsn, Item(data, max_retransmissions, time_sent, expires_at))
+ .first;
if (it->second.has_expired(time_sent)) {
// No need to send it - it was expired when it was in the send
diff --git a/net/dcsctp/tx/outstanding_data.h b/net/dcsctp/tx/outstanding_data.h
index dc9aab7..a49a9c6 100644
--- a/net/dcsctp/tx/outstanding_data.h
+++ b/net/dcsctp/tx/outstanding_data.h
@@ -147,14 +147,14 @@
kAbandon,
};
- explicit Item(Data data,
+ explicit Item(const Data& data,
MaxRetransmits max_retransmissions,
TimeMs time_sent,
TimeMs expires_at)
: max_retransmissions_(max_retransmissions),
time_sent_(time_sent),
expires_at_(expires_at),
- data_(std::move(data)) {}
+ data_(data.Clone()) {}
TimeMs time_sent() const { return time_sent_; }
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 21744cc..5909b78 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -9,6 +9,7 @@
*/
#include "net/dcsctp/tx/rr_send_queue.h"
+#include <algorithm>
#include <cstdint>
#include <deque>
#include <limits>
@@ -24,6 +25,7 @@
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"
+#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/logging.h"
namespace dcsctp {
@@ -96,7 +98,14 @@
}
}
- return total_buffered_amount == total_buffered_amount_.value();
+ if (total_buffered_amount != total_buffered_amount_.value()) {
+ RTC_DLOG(LS_ERROR) << "Actual total_buffered_amount="
+ << total_buffered_amount
+ << " != expected total_buffered_amount="
+ << total_buffered_amount_.value();
+ return false;
+ }
+ return true;
}
bool RRSendQueue::OutgoingStream::IsConsistent() const {
@@ -104,7 +113,13 @@
for (const auto& item : items_) {
bytes += item.remaining_size;
}
- return bytes == buffered_amount_.value();
+ if (bytes != buffered_amount_.value()) {
+ RTC_DLOG(LS_ERROR) << "Actual buffered amount=" << bytes
+ << " != expected buffered_amount_="
+ << buffered_amount_.value();
+ return false;
+ }
+ return true;
}
void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) {
@@ -161,22 +176,20 @@
}
// Grab the next `max_size` fragment from this message and calculate flags.
- rtc::ArrayView<const uint8_t> chunk_payload =
- item->message.payload().subview(item->remaining_offset, max_size);
- rtc::ArrayView<const uint8_t> message_payload = message.payload();
- Data::IsBeginning is_beginning(chunk_payload.data() ==
- message_payload.data());
- Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) ==
- (message_payload.data() + message_payload.size()));
+ size_t actual_size = std::min(max_size, item->remaining_size);
+ RTC_DCHECK(actual_size > 0);
+ Data::IsBeginning is_beginning(item->remaining_offset == 0);
+ Data::IsEnd is_end(actual_size == item->remaining_size);
StreamID stream_id = message.stream_id();
PPID ppid = message.ppid();
// Zero-copy the payload if the message fits in a single chunk.
- std::vector<uint8_t> payload =
+ rtc::CopyOnWriteBuffer payload =
is_beginning && is_end
- ? std::move(message).ReleasePayload()
- : std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end());
+ ? std::move(message).ReleaseBufferPayload()
+ : message.buffer_payload().Slice(item->remaining_offset, actual_size);
+ RTC_DCHECK_EQ(payload.size(), actual_size);
FSN fsn(item->current_fsn);
item->current_fsn = FSN(*item->current_fsn + 1);
@@ -202,8 +215,8 @@
// it can safely be discarded.
items_.pop_front();
} else {
- item->remaining_offset += chunk_payload.size();
- item->remaining_size -= chunk_payload.size();
+ item->remaining_offset += actual_size;
+ item->remaining_size -= actual_size;
RTC_DCHECK(item->remaining_offset + item->remaining_size ==
item->message.payload().size());
RTC_DCHECK(item->remaining_size > 0);