dcsctp: Add a fastpath for interleaved reassembly
The same as https://webrtc-review.googlesource.com/c/src/+/331340, but
for interleaved messages.
This avoids inserting into maps where possible, and also fixes a bug
when the payload was accidentally copied unintentionally -
crbug.com/365594101.
Bug: chromium:365594101
Change-Id: Iaeaa97b0cf3a26ada9afc61f2545760b7ab4c731
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/363960
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#43099}
diff --git a/net/dcsctp/rx/interleaved_reassembly_streams.cc b/net/dcsctp/rx/interleaved_reassembly_streams.cc
index 9dc2e43..0e74c24 100644
--- a/net/dcsctp/rx/interleaved_reassembly_streams.cc
+++ b/net/dcsctp/rx/interleaved_reassembly_streams.cc
@@ -37,14 +37,13 @@
size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage(
UnwrappedMID mid) {
- std::map<UnwrappedMID, ChunkMap>::const_iterator it =
- chunks_by_mid_.find(mid);
+ std::map<UnwrappedMID, ChunkMap>::iterator it = chunks_by_mid_.find(mid);
if (it == chunks_by_mid_.end()) {
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
<< *mid.Wrap() << " - no chunks";
return 0;
}
- const ChunkMap& chunks = it->second;
+ ChunkMap& chunks = it->second;
if (!chunks.begin()->second.second.is_beginning ||
!chunks.rbegin()->second.second.is_end) {
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
@@ -69,17 +68,22 @@
return removed_bytes;
}
+size_t InterleavedReassemblyStreams::Stream::AssembleMessage(UnwrappedTSN tsn,
+ Data data) {
+ size_t payload_size = data.size();
+ UnwrappedTSN tsns[1] = {tsn};
+ DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
+ parent_.on_assembled_message_(tsns, std::move(message));
+ return payload_size;
+}
+
size_t InterleavedReassemblyStreams::Stream::AssembleMessage(
- const ChunkMap& tsn_chunks) {
+ ChunkMap& tsn_chunks) {
size_t count = tsn_chunks.size();
if (count == 1) {
// Fast path - zero-copy
- const Data& data = tsn_chunks.begin()->second.second;
- size_t payload_size = data.size();
- UnwrappedTSN tsns[1] = {tsn_chunks.begin()->second.first};
- DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
- parent_.on_assembled_message_(tsns, std::move(message));
- return payload_size;
+ return AssembleMessage(tsn_chunks.begin()->second.first,
+ std::move(tsn_chunks.begin()->second.second));
}
// Slow path - will need to concatenate the payload.
@@ -137,6 +141,21 @@
int queued_bytes = data.size();
UnwrappedMID mid = mid_unwrapper_.Unwrap(data.mid);
FSN fsn = data.fsn;
+
+ // Avoid inserting it into any map if it can be delivered directly.
+ if (stream_id_.unordered && data.is_beginning && data.is_end) {
+ AssembleMessage(tsn, std::move(data));
+ return 0;
+
+ } else if (!stream_id_.unordered && mid == next_mid_ && data.is_beginning &&
+ data.is_end) {
+ AssembleMessage(tsn, std::move(data));
+ next_mid_.Increment();
+ // This might unblock assembling more messages.
+ return -TryToAssembleMessages();
+ }
+
+ // Slow path.
auto [unused, inserted] =
chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data)));
if (!inserted) {
diff --git a/net/dcsctp/rx/interleaved_reassembly_streams.h b/net/dcsctp/rx/interleaved_reassembly_streams.h
index a6faadd..2dd7d16 100644
--- a/net/dcsctp/rx/interleaved_reassembly_streams.h
+++ b/net/dcsctp/rx/interleaved_reassembly_streams.h
@@ -81,7 +81,9 @@
// Try to assemble one message identified by `mid`.
// Returns the number of bytes assembled if a message was assembled.
size_t TryToAssembleMessage(UnwrappedMID mid);
- size_t AssembleMessage(const ChunkMap& tsn_chunks);
+ size_t AssembleMessage(ChunkMap& tsn_chunks);
+ size_t AssembleMessage(UnwrappedTSN tsn, Data data);
+
// Try to assemble one or several messages in order from the stream.
// Returns the number of bytes assembled if one or more messages were
// assembled.
diff --git a/net/dcsctp/rx/interleaved_reassembly_streams_test.cc b/net/dcsctp/rx/interleaved_reassembly_streams_test.cc
index df4024e..adcd514 100644
--- a/net/dcsctp/rx/interleaved_reassembly_streams_test.cc
+++ b/net/dcsctp/rx/interleaved_reassembly_streams_test.cc
@@ -24,8 +24,10 @@
namespace dcsctp {
namespace {
+using ::testing::ElementsAre;
using ::testing::MockFunction;
using ::testing::NiceMock;
+using ::testing::Property;
class InterleavedReassemblyStreamsTest : public testing::Test {
protected:
@@ -150,5 +152,62 @@
EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u);
}
+TEST_F(InterleavedReassemblyStreamsTest, CanReassembleFastPathUnordered) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ {
+ testing::InSequence s;
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(1)),
+ Property(&DcSctpMessage::payload, ElementsAre(1))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(3)),
+ Property(&DcSctpMessage::payload, ElementsAre(3))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(2)),
+ Property(&DcSctpMessage::payload, ElementsAre(2))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(4)),
+ Property(&DcSctpMessage::payload, ElementsAre(4))));
+ }
+
+ InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "BE")), 0);
+ EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({3}, "BE")), 0);
+ EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2}, "BE")), 0);
+ EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({4}, "BE")), 0);
+}
+
+TEST_F(InterleavedReassemblyStreamsTest, CanReassembleFastPathOrdered) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ {
+ testing::InSequence s;
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(1)),
+ Property(&DcSctpMessage::payload, ElementsAre(1))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(2)),
+ Property(&DcSctpMessage::payload, ElementsAre(2))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(3)),
+ Property(&DcSctpMessage::payload, ElementsAre(3))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(4)),
+ Property(&DcSctpMessage::payload, ElementsAre(4))));
+ }
+
+ InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ Data data1 = gen_.Ordered({1}, "BE");
+ Data data2 = gen_.Ordered({2}, "BE");
+ Data data3 = gen_.Ordered({3}, "BE");
+ Data data4 = gen_.Ordered({4}, "BE");
+ EXPECT_EQ(streams.Add(tsn(1), std::move(data1)), 0);
+ EXPECT_EQ(streams.Add(tsn(3), std::move(data3)), 1);
+ EXPECT_EQ(streams.Add(tsn(2), std::move(data2)), -1);
+ EXPECT_EQ(streams.Add(tsn(4), std::move(data4)), 0);
+}
} // namespace
} // namespace dcsctp
diff --git a/net/dcsctp/rx/traditional_reassembly_streams_test.cc b/net/dcsctp/rx/traditional_reassembly_streams_test.cc
index 3418704..9aa0cec 100644
--- a/net/dcsctp/rx/traditional_reassembly_streams_test.cc
+++ b/net/dcsctp/rx/traditional_reassembly_streams_test.cc
@@ -253,5 +253,62 @@
EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4}, "BE")), 0);
}
+TEST_F(TraditionalReassemblyStreamsTest, CanReassembleFastPathUnordered) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ {
+ testing::InSequence s;
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(1)),
+ Property(&DcSctpMessage::payload, ElementsAre(1))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(3)),
+ Property(&DcSctpMessage::payload, ElementsAre(3))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(2)),
+ Property(&DcSctpMessage::payload, ElementsAre(2))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(4)),
+ Property(&DcSctpMessage::payload, ElementsAre(4))));
+ }
+
+ TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "BE")), 0);
+ EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({3}, "BE")), 0);
+ EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2}, "BE")), 0);
+ EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({4}, "BE")), 0);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest, CanReassembleFastPathOrdered) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ {
+ testing::InSequence s;
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(1)),
+ Property(&DcSctpMessage::payload, ElementsAre(1))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(2)),
+ Property(&DcSctpMessage::payload, ElementsAre(2))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(3)),
+ Property(&DcSctpMessage::payload, ElementsAre(3))));
+ EXPECT_CALL(on_assembled,
+ Call(ElementsAre(tsn(4)),
+ Property(&DcSctpMessage::payload, ElementsAre(4))));
+ }
+
+ TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ Data data1 = gen_.Ordered({1}, "BE");
+ Data data2 = gen_.Ordered({2}, "BE");
+ Data data3 = gen_.Ordered({3}, "BE");
+ Data data4 = gen_.Ordered({4}, "BE");
+ EXPECT_EQ(streams.Add(tsn(1), std::move(data1)), 0);
+ EXPECT_EQ(streams.Add(tsn(3), std::move(data3)), 1);
+ EXPECT_EQ(streams.Add(tsn(2), std::move(data2)), -1);
+ EXPECT_EQ(streams.Add(tsn(4), std::move(data4)), 0);
+}
} // namespace
} // namespace dcsctp