dcsctp: Add a fastpath for interleaved reassembly

The same as https://webrtc-review.googlesource.com/c/src/+/331340, but
for interleaved messages.

This avoids inserting into maps where possible, and also fixes a bug
when the payload was accidentally copied unintentionally -
crbug.com/365594101.

Bug: chromium:365594101
Change-Id: Iaeaa97b0cf3a26ada9afc61f2545760b7ab4c731
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/363960
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#43099}
diff --git a/net/dcsctp/rx/interleaved_reassembly_streams.cc b/net/dcsctp/rx/interleaved_reassembly_streams.cc
index 9dc2e43..0e74c24 100644
--- a/net/dcsctp/rx/interleaved_reassembly_streams.cc
+++ b/net/dcsctp/rx/interleaved_reassembly_streams.cc
@@ -37,14 +37,13 @@
 
 size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage(
     UnwrappedMID mid) {
-  std::map<UnwrappedMID, ChunkMap>::const_iterator it =
-      chunks_by_mid_.find(mid);
+  std::map<UnwrappedMID, ChunkMap>::iterator it = chunks_by_mid_.find(mid);
   if (it == chunks_by_mid_.end()) {
     RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
                          << *mid.Wrap() << " - no chunks";
     return 0;
   }
-  const ChunkMap& chunks = it->second;
+  ChunkMap& chunks = it->second;
   if (!chunks.begin()->second.second.is_beginning ||
       !chunks.rbegin()->second.second.is_end) {
     RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
@@ -69,17 +68,22 @@
   return removed_bytes;
 }
 
+size_t InterleavedReassemblyStreams::Stream::AssembleMessage(UnwrappedTSN tsn,
+                                                             Data data) {
+  size_t payload_size = data.size();
+  UnwrappedTSN tsns[1] = {tsn};
+  DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
+  parent_.on_assembled_message_(tsns, std::move(message));
+  return payload_size;
+}
+
 size_t InterleavedReassemblyStreams::Stream::AssembleMessage(
-    const ChunkMap& tsn_chunks) {
+    ChunkMap& tsn_chunks) {
   size_t count = tsn_chunks.size();
   if (count == 1) {
     // Fast path - zero-copy
-    const Data& data = tsn_chunks.begin()->second.second;
-    size_t payload_size = data.size();
-    UnwrappedTSN tsns[1] = {tsn_chunks.begin()->second.first};
-    DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
-    parent_.on_assembled_message_(tsns, std::move(message));
-    return payload_size;
+    return AssembleMessage(tsn_chunks.begin()->second.first,
+                           std::move(tsn_chunks.begin()->second.second));
   }
 
   // Slow path - will need to concatenate the payload.
@@ -137,6 +141,21 @@
   int queued_bytes = data.size();
   UnwrappedMID mid = mid_unwrapper_.Unwrap(data.mid);
   FSN fsn = data.fsn;
+
+  // Avoid inserting it into any map if it can be delivered directly.
+  if (stream_id_.unordered && data.is_beginning && data.is_end) {
+    AssembleMessage(tsn, std::move(data));
+    return 0;
+
+  } else if (!stream_id_.unordered && mid == next_mid_ && data.is_beginning &&
+             data.is_end) {
+    AssembleMessage(tsn, std::move(data));
+    next_mid_.Increment();
+    // This might unblock assembling more messages.
+    return -TryToAssembleMessages();
+  }
+
+  // Slow path.
   auto [unused, inserted] =
       chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data)));
   if (!inserted) {
diff --git a/net/dcsctp/rx/interleaved_reassembly_streams.h b/net/dcsctp/rx/interleaved_reassembly_streams.h
index a6faadd..2dd7d16 100644
--- a/net/dcsctp/rx/interleaved_reassembly_streams.h
+++ b/net/dcsctp/rx/interleaved_reassembly_streams.h
@@ -81,7 +81,9 @@
     // Try to assemble one message identified by `mid`.
     // Returns the number of bytes assembled if a message was assembled.
     size_t TryToAssembleMessage(UnwrappedMID mid);
-    size_t AssembleMessage(const ChunkMap& tsn_chunks);
+    size_t AssembleMessage(ChunkMap& tsn_chunks);
+    size_t AssembleMessage(UnwrappedTSN tsn, Data data);
+
     // Try to assemble one or several messages in order from the stream.
     // Returns the number of bytes assembled if one or more messages were
     // assembled.
diff --git a/net/dcsctp/rx/interleaved_reassembly_streams_test.cc b/net/dcsctp/rx/interleaved_reassembly_streams_test.cc
index df4024e..adcd514 100644
--- a/net/dcsctp/rx/interleaved_reassembly_streams_test.cc
+++ b/net/dcsctp/rx/interleaved_reassembly_streams_test.cc
@@ -24,8 +24,10 @@
 
 namespace dcsctp {
 namespace {
+using ::testing::ElementsAre;
 using ::testing::MockFunction;
 using ::testing::NiceMock;
+using ::testing::Property;
 
 class InterleavedReassemblyStreamsTest : public testing::Test {
  protected:
@@ -150,5 +152,62 @@
   EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u);
 }
 
+TEST_F(InterleavedReassemblyStreamsTest, CanReassembleFastPathUnordered) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  {
+    testing::InSequence s;
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(1)),
+                     Property(&DcSctpMessage::payload, ElementsAre(1))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(3)),
+                     Property(&DcSctpMessage::payload, ElementsAre(3))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(2)),
+                     Property(&DcSctpMessage::payload, ElementsAre(2))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(4)),
+                     Property(&DcSctpMessage::payload, ElementsAre(4))));
+  }
+
+  InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "BE")), 0);
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({3}, "BE")), 0);
+  EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2}, "BE")), 0);
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({4}, "BE")), 0);
+}
+
+TEST_F(InterleavedReassemblyStreamsTest, CanReassembleFastPathOrdered) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  {
+    testing::InSequence s;
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(1)),
+                     Property(&DcSctpMessage::payload, ElementsAre(1))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(2)),
+                     Property(&DcSctpMessage::payload, ElementsAre(2))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(3)),
+                     Property(&DcSctpMessage::payload, ElementsAre(3))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(4)),
+                     Property(&DcSctpMessage::payload, ElementsAre(4))));
+  }
+
+  InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  Data data1 = gen_.Ordered({1}, "BE");
+  Data data2 = gen_.Ordered({2}, "BE");
+  Data data3 = gen_.Ordered({3}, "BE");
+  Data data4 = gen_.Ordered({4}, "BE");
+  EXPECT_EQ(streams.Add(tsn(1), std::move(data1)), 0);
+  EXPECT_EQ(streams.Add(tsn(3), std::move(data3)), 1);
+  EXPECT_EQ(streams.Add(tsn(2), std::move(data2)), -1);
+  EXPECT_EQ(streams.Add(tsn(4), std::move(data4)), 0);
+}
 }  // namespace
 }  // namespace dcsctp
diff --git a/net/dcsctp/rx/traditional_reassembly_streams_test.cc b/net/dcsctp/rx/traditional_reassembly_streams_test.cc
index 3418704..9aa0cec 100644
--- a/net/dcsctp/rx/traditional_reassembly_streams_test.cc
+++ b/net/dcsctp/rx/traditional_reassembly_streams_test.cc
@@ -253,5 +253,62 @@
   EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4}, "BE")), 0);
 }
 
+TEST_F(TraditionalReassemblyStreamsTest, CanReassembleFastPathUnordered) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  {
+    testing::InSequence s;
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(1)),
+                     Property(&DcSctpMessage::payload, ElementsAre(1))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(3)),
+                     Property(&DcSctpMessage::payload, ElementsAre(3))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(2)),
+                     Property(&DcSctpMessage::payload, ElementsAre(2))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(4)),
+                     Property(&DcSctpMessage::payload, ElementsAre(4))));
+  }
+
+  TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "BE")), 0);
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({3}, "BE")), 0);
+  EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2}, "BE")), 0);
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({4}, "BE")), 0);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest, CanReassembleFastPathOrdered) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  {
+    testing::InSequence s;
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(1)),
+                     Property(&DcSctpMessage::payload, ElementsAre(1))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(2)),
+                     Property(&DcSctpMessage::payload, ElementsAre(2))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(3)),
+                     Property(&DcSctpMessage::payload, ElementsAre(3))));
+    EXPECT_CALL(on_assembled,
+                Call(ElementsAre(tsn(4)),
+                     Property(&DcSctpMessage::payload, ElementsAre(4))));
+  }
+
+  TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  Data data1 = gen_.Ordered({1}, "BE");
+  Data data2 = gen_.Ordered({2}, "BE");
+  Data data3 = gen_.Ordered({3}, "BE");
+  Data data4 = gen_.Ordered({4}, "BE");
+  EXPECT_EQ(streams.Add(tsn(1), std::move(data1)), 0);
+  EXPECT_EQ(streams.Add(tsn(3), std::move(data3)), 1);
+  EXPECT_EQ(streams.Add(tsn(2), std::move(data2)), -1);
+  EXPECT_EQ(streams.Add(tsn(4), std::move(data4)), 0);
+}
 }  // namespace
 }  // namespace dcsctp