dcsctp: Add interleaved reassembly streams

This is the receive-side part of supporting what is frequently called
"ndata", but actually RFC8260 - "User Message Interleaving".

This CL adds a new ReassemblyStreams implementation that can assemble
I-DATA chunks and process I-FORWARD-TSN for partial reliability.

Bug: webrtc:5696
Change-Id: I3cfbea62e7b6c02fbd3f51b43ba3fb7863cf0f88
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/218506
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37128}
diff --git a/net/dcsctp/rx/BUILD.gn b/net/dcsctp/rx/BUILD.gn
index d24f6d6..8ef60dc 100644
--- a/net/dcsctp/rx/BUILD.gn
+++ b/net/dcsctp/rx/BUILD.gn
@@ -44,6 +44,28 @@
   absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
 }
 
+rtc_library("interleaved_reassembly_streams") {
+  deps = [
+    ":reassembly_streams",
+    "../../../api:array_view",
+    "../../../rtc_base",
+    "../../../rtc_base:checks",
+    "../../../rtc_base:logging",
+    "../common:sequence_numbers",
+    "../packet:chunk",
+    "../packet:data",
+    "../public:types",
+  ]
+  sources = [
+    "interleaved_reassembly_streams.cc",
+    "interleaved_reassembly_streams.h",
+  ]
+  absl_deps = [
+    "//third_party/abseil-cpp/absl/algorithm:container",
+    "//third_party/abseil-cpp/absl/strings",
+    "//third_party/abseil-cpp/absl/types:optional",
+  ]
+}
 rtc_library("traditional_reassembly_streams") {
   deps = [
     ":reassembly_streams",
@@ -68,6 +90,7 @@
 
 rtc_library("reassembly_queue") {
   deps = [
+    ":interleaved_reassembly_streams",
     ":reassembly_streams",
     ":traditional_reassembly_streams",
     "../../../api:array_view",
@@ -98,6 +121,7 @@
 
     deps = [
       ":data_tracker",
+      ":interleaved_reassembly_streams",
       ":reassembly_queue",
       ":reassembly_streams",
       ":traditional_reassembly_streams",
@@ -117,6 +141,7 @@
     absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
     sources = [
       "data_tracker_test.cc",
+      "interleaved_reassembly_streams_test.cc",
       "reassembly_queue_test.cc",
       "traditional_reassembly_streams_test.cc",
     ]
diff --git a/net/dcsctp/rx/interleaved_reassembly_streams.cc b/net/dcsctp/rx/interleaved_reassembly_streams.cc
new file mode 100644
index 0000000..847058b
--- /dev/null
+++ b/net/dcsctp/rx/interleaved_reassembly_streams.cc
@@ -0,0 +1,270 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
+
+#include <stddef.h>
+
+#include <cstdint>
+#include <functional>
+#include <iterator>
+#include <map>
+#include <numeric>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/types.h"
+#include "rtc_base/logging.h"
+
+namespace dcsctp {
+
+InterleavedReassemblyStreams::InterleavedReassemblyStreams(
+    absl::string_view log_prefix,
+    OnAssembledMessage on_assembled_message,
+    const DcSctpSocketHandoverState* handover_state)
+    : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {
+  if (handover_state) {
+    for (const DcSctpSocketHandoverState::OrderedStream& state :
+         handover_state->rx.ordered_streams) {
+      FullStreamId stream_id(IsUnordered(false), StreamID(state.id));
+      streams_.emplace(
+          std::piecewise_construct, std::forward_as_tuple(stream_id),
+          std::forward_as_tuple(stream_id, this, MID(state.next_ssn)));
+    }
+    for (const DcSctpSocketHandoverState::UnorderedStream& state :
+         handover_state->rx.unordered_streams) {
+      FullStreamId stream_id(IsUnordered(true), StreamID(state.id));
+      streams_.emplace(std::piecewise_construct,
+                       std::forward_as_tuple(stream_id),
+                       std::forward_as_tuple(stream_id, this));
+    }
+  }
+}
+
+size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage(
+    UnwrappedMID mid) {
+  std::map<UnwrappedMID, ChunkMap>::const_iterator it =
+      chunks_by_mid_.find(mid);
+  if (it == chunks_by_mid_.end()) {
+    RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
+                         << *mid.Wrap() << " - no chunks";
+    return 0;
+  }
+  const ChunkMap& chunks = it->second;
+  if (!chunks.begin()->second.second.is_beginning ||
+      !chunks.rbegin()->second.second.is_end) {
+    RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
+                         << *mid.Wrap() << "- missing beginning or end";
+    return 0;
+  }
+  int64_t fsn_diff = *chunks.rbegin()->first - *chunks.begin()->first;
+  if (fsn_diff != (static_cast<int64_t>(chunks.size()) - 1)) {
+    RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
+                         << *mid.Wrap() << "- not all chunks exist (have "
+                         << chunks.size() << ", expect " << (fsn_diff + 1)
+                         << ")";
+    return 0;
+  }
+
+  size_t removed_bytes = AssembleMessage(chunks);
+  RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
+                       << *mid.Wrap() << " - succeeded and removed "
+                       << removed_bytes;
+
+  chunks_by_mid_.erase(mid);
+  return removed_bytes;
+}
+
+size_t InterleavedReassemblyStreams::Stream::AssembleMessage(
+    const ChunkMap& tsn_chunks) {
+  size_t count = tsn_chunks.size();
+  if (count == 1) {
+    // Fast path - zero-copy
+    const Data& data = tsn_chunks.begin()->second.second;
+    size_t payload_size = data.size();
+    UnwrappedTSN tsns[1] = {tsn_chunks.begin()->second.first};
+    DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
+    parent_.on_assembled_message_(tsns, std::move(message));
+    return payload_size;
+  }
+
+  // Slow path - will need to concatenate the payload.
+  std::vector<UnwrappedTSN> tsns;
+  tsns.reserve(count);
+
+  std::vector<uint8_t> payload;
+  size_t payload_size = absl::c_accumulate(
+      tsn_chunks, 0,
+      [](size_t v, const auto& p) { return v + p.second.second.size(); });
+  payload.reserve(payload_size);
+
+  for (auto& item : tsn_chunks) {
+    const UnwrappedTSN tsn = item.second.first;
+    const Data& data = item.second.second;
+    tsns.push_back(tsn);
+    payload.insert(payload.end(), data.payload.begin(), data.payload.end());
+  }
+
+  const Data& data = tsn_chunks.begin()->second.second;
+
+  DcSctpMessage message(data.stream_id, data.ppid, std::move(payload));
+  parent_.on_assembled_message_(tsns, std::move(message));
+  return payload_size;
+}
+
+size_t InterleavedReassemblyStreams::Stream::EraseTo(MID message_id) {
+  UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(message_id);
+
+  size_t removed_bytes = 0;
+  auto it = chunks_by_mid_.begin();
+  while (it != chunks_by_mid_.end() && it->first <= unwrapped_mid) {
+    removed_bytes += absl::c_accumulate(
+        it->second, 0,
+        [](size_t r2, const auto& q) { return r2 + q.second.second.size(); });
+    it = chunks_by_mid_.erase(it);
+  }
+
+  if (!stream_id_.unordered) {
+    // For ordered streams, erasing a message might suddenly unblock that queue
+    // and allow it to deliver any following received messages.
+    if (unwrapped_mid >= next_mid_) {
+      next_mid_ = unwrapped_mid.next_value();
+    }
+
+    removed_bytes += TryToAssembleMessages();
+  }
+
+  return removed_bytes;
+}
+
+int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) {
+  RTC_DCHECK_EQ(*data.is_unordered, *stream_id_.unordered);
+  RTC_DCHECK_EQ(*data.stream_id, *stream_id_.stream_id);
+  int queued_bytes = data.size();
+  UnwrappedMID mid = mid_unwrapper_.Unwrap(data.message_id);
+  FSN fsn = data.fsn;
+  auto [unused, inserted] =
+      chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data)));
+  if (!inserted) {
+    return 0;
+  }
+
+  if (stream_id_.unordered) {
+    queued_bytes -= TryToAssembleMessage(mid);
+  } else {
+    if (mid == next_mid_) {
+      queued_bytes -= TryToAssembleMessages();
+    }
+  }
+
+  return queued_bytes;
+}
+
+size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessages() {
+  size_t removed_bytes = 0;
+
+  for (;;) {
+    size_t removed_bytes_this_iter = TryToAssembleMessage(next_mid_);
+    if (removed_bytes_this_iter == 0) {
+      break;
+    }
+
+    removed_bytes += removed_bytes_this_iter;
+    next_mid_.Increment();
+  }
+  return removed_bytes;
+}
+
+void InterleavedReassemblyStreams::Stream::AddHandoverState(
+    DcSctpSocketHandoverState& state) const {
+  if (stream_id_.unordered) {
+    DcSctpSocketHandoverState::UnorderedStream state_stream;
+    state_stream.id = stream_id_.stream_id.value();
+    state.rx.unordered_streams.push_back(std::move(state_stream));
+  } else {
+    DcSctpSocketHandoverState::OrderedStream state_stream;
+    state_stream.id = stream_id_.stream_id.value();
+    state_stream.next_ssn = next_mid_.Wrap().value();
+    state.rx.ordered_streams.push_back(std::move(state_stream));
+  }
+}
+
+InterleavedReassemblyStreams::Stream&
+InterleavedReassemblyStreams::GetOrCreateStream(const FullStreamId& stream_id) {
+  auto it = streams_.find(stream_id);
+  if (it == streams_.end()) {
+    it =
+        streams_
+            .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
+                     std::forward_as_tuple(stream_id, this))
+            .first;
+  }
+  return it->second;
+}
+
+int InterleavedReassemblyStreams::Add(UnwrappedTSN tsn, Data data) {
+  return GetOrCreateStream(FullStreamId(data.is_unordered, data.stream_id))
+      .Add(tsn, std::move(data));
+}
+
+size_t InterleavedReassemblyStreams::HandleForwardTsn(
+    UnwrappedTSN new_cumulative_ack_tsn,
+    rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
+  size_t removed_bytes = 0;
+  for (const auto& skipped : skipped_streams) {
+    removed_bytes +=
+        GetOrCreateStream(FullStreamId(skipped.unordered, skipped.stream_id))
+            .EraseTo(skipped.message_id);
+  }
+  return removed_bytes;
+}
+
+void InterleavedReassemblyStreams::ResetStreams(
+    rtc::ArrayView<const StreamID> stream_ids) {
+  if (stream_ids.empty()) {
+    for (auto& entry : streams_) {
+      entry.second.Reset();
+    }
+  } else {
+    for (StreamID stream_id : stream_ids) {
+      GetOrCreateStream(FullStreamId(IsUnordered(true), stream_id)).Reset();
+      GetOrCreateStream(FullStreamId(IsUnordered(false), stream_id)).Reset();
+    }
+  }
+}
+
+HandoverReadinessStatus InterleavedReassemblyStreams::GetHandoverReadiness()
+    const {
+  HandoverReadinessStatus status;
+  for (const auto& [stream_id, stream] : streams_) {
+    if (stream.has_unassembled_chunks()) {
+      status.Add(
+          stream_id.unordered
+              ? HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks
+              : HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks);
+      break;
+    }
+  }
+  return status;
+}
+
+void InterleavedReassemblyStreams::AddHandoverState(
+    DcSctpSocketHandoverState& state) {
+  for (const auto& [unused, stream] : streams_) {
+    stream.AddHandoverState(state);
+  }
+}
+
+}  // namespace dcsctp
diff --git a/net/dcsctp/rx/interleaved_reassembly_streams.h b/net/dcsctp/rx/interleaved_reassembly_streams.h
new file mode 100644
index 0000000..9d4bbc7
--- /dev/null
+++ b/net/dcsctp/rx/interleaved_reassembly_streams.h
@@ -0,0 +1,111 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_RX_INTERLEAVED_REASSEMBLY_STREAMS_H_
+#define NET_DCSCTP_RX_INTERLEAVED_REASSEMBLY_STREAMS_H_
+
+#include <cstdint>
+#include <map>
+#include <string>
+#include <utility>
+
+#include "absl/strings/string_view.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/rx/reassembly_streams.h"
+
+namespace dcsctp {
+
+// Handles reassembly of incoming data when interleaved message sending is
+// enabled on the association, i.e. when RFC8260 is in use.
+class InterleavedReassemblyStreams : public ReassemblyStreams {
+ public:
+  InterleavedReassemblyStreams(
+      absl::string_view log_prefix,
+      OnAssembledMessage on_assembled_message,
+      const DcSctpSocketHandoverState* handover_state = nullptr);
+
+  int Add(UnwrappedTSN tsn, Data data) override;
+
+  size_t HandleForwardTsn(
+      UnwrappedTSN new_cumulative_ack_tsn,
+      rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams)
+      override;
+
+  void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) override;
+
+  HandoverReadinessStatus GetHandoverReadiness() const override;
+  void AddHandoverState(DcSctpSocketHandoverState& state) override;
+
+ private:
+  struct FullStreamId {
+    const IsUnordered unordered;
+    const StreamID stream_id;
+
+    FullStreamId(IsUnordered unordered, StreamID stream_id)
+        : unordered(unordered), stream_id(stream_id) {}
+
+    friend bool operator<(FullStreamId a, FullStreamId b) {
+      return a.unordered < b.unordered ||
+             (!(a.unordered < b.unordered) && (a.stream_id < b.stream_id));
+    }
+  };
+
+  class Stream {
+   public:
+    Stream(FullStreamId stream_id,
+           InterleavedReassemblyStreams* parent,
+           MID next_mid = MID(0))
+        : stream_id_(stream_id),
+          parent_(*parent),
+          next_mid_(mid_unwrapper_.Unwrap(next_mid)) {}
+    int Add(UnwrappedTSN tsn, Data data);
+    size_t EraseTo(MID message_id);
+    void Reset() {
+      mid_unwrapper_.Reset();
+      next_mid_ = mid_unwrapper_.Unwrap(MID(0));
+    }
+    bool has_unassembled_chunks() const { return !chunks_by_mid_.empty(); }
+    void AddHandoverState(DcSctpSocketHandoverState& state) const;
+
+   private:
+    using ChunkMap = std::map<FSN, std::pair<UnwrappedTSN, Data>>;
+
+    // Try to assemble one message identified by `mid`.
+    // Returns the number of bytes assembled if a message was assembled.
+    size_t TryToAssembleMessage(UnwrappedMID mid);
+    size_t AssembleMessage(const ChunkMap& tsn_chunks);
+    // Try to assemble one or several messages in order from the stream.
+    // Returns the number of bytes assembled if one or more messages were
+    // assembled.
+    size_t TryToAssembleMessages();
+
+    const FullStreamId stream_id_;
+    InterleavedReassemblyStreams& parent_;
+    std::map<UnwrappedMID, ChunkMap> chunks_by_mid_;
+    UnwrappedMID::Unwrapper mid_unwrapper_;
+    UnwrappedMID next_mid_;
+  };
+
+  Stream& GetOrCreateStream(const FullStreamId& stream_id);
+
+  const std::string log_prefix_;
+
+  // Callback for when a message has been assembled.
+  const OnAssembledMessage on_assembled_message_;
+
+  // All unordered and ordered streams, managing not-yet-assembled data.
+  std::map<FullStreamId, Stream> streams_;
+};
+
+}  // namespace dcsctp
+
+#endif  // NET_DCSCTP_RX_INTERLEAVED_REASSEMBLY_STREAMS_H_
diff --git a/net/dcsctp/rx/interleaved_reassembly_streams_test.cc b/net/dcsctp/rx/interleaved_reassembly_streams_test.cc
new file mode 100644
index 0000000..df4024e
--- /dev/null
+++ b/net/dcsctp/rx/interleaved_reassembly_streams_test.cc
@@ -0,0 +1,154 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
+
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/rx/reassembly_streams.h"
+#include "net/dcsctp/testing/data_generator.h"
+#include "rtc_base/gunit.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+namespace {
+using ::testing::MockFunction;
+using ::testing::NiceMock;
+
+class InterleavedReassemblyStreamsTest : public testing::Test {
+ protected:
+  UnwrappedTSN tsn(uint32_t value) { return tsn_.Unwrap(TSN(value)); }
+
+  InterleavedReassemblyStreamsTest() {}
+  DataGenerator gen_;
+  UnwrappedTSN::Unwrapper tsn_;
+};
+
+TEST_F(InterleavedReassemblyStreamsTest,
+       AddUnorderedMessageReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
+  EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2);
+  // Adding the end fragment should make it empty again.
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({7}, "E")), -6);
+}
+
+TEST_F(InterleavedReassemblyStreamsTest,
+       AddSimpleOrderedMessageReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+  EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), -6);
+}
+
+TEST_F(InterleavedReassemblyStreamsTest,
+       AddMoreComplexOrderedMessageReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+  Data late = gen_.Ordered({2, 3, 4});
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
+
+  EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
+  EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
+  EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
+  EXPECT_EQ(streams.Add(tsn(2), std::move(late)), -8);
+}
+
+TEST_F(InterleavedReassemblyStreamsTest,
+       DeleteUnorderedMessageReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
+  EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2);
+
+  IForwardTsnChunk::SkippedStream skipped[] = {
+      IForwardTsnChunk::SkippedStream(IsUnordered(true), StreamID(1), MID(0))};
+  EXPECT_EQ(streams.HandleForwardTsn(tsn(3), skipped), 6u);
+}
+
+TEST_F(InterleavedReassemblyStreamsTest,
+       DeleteSimpleOrderedMessageReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+  EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+
+  IForwardTsnChunk::SkippedStream skipped[] = {
+      IForwardTsnChunk::SkippedStream(IsUnordered(false), StreamID(1), MID(0))};
+  EXPECT_EQ(streams.HandleForwardTsn(tsn(3), skipped), 6u);
+}
+
+TEST_F(InterleavedReassemblyStreamsTest,
+       DeleteManyOrderedMessagesReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+  gen_.Ordered({2, 3, 4});
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
+
+  EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
+  EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
+  EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
+
+  // Expire all three messages
+  IForwardTsnChunk::SkippedStream skipped[] = {
+      IForwardTsnChunk::SkippedStream(IsUnordered(false), StreamID(1), MID(2))};
+  EXPECT_EQ(streams.HandleForwardTsn(tsn(8), skipped), 8u);
+}
+
+TEST_F(InterleavedReassemblyStreamsTest,
+       DeleteOrderedMessageDelivesTwoReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+  gen_.Ordered({2, 3, 4});
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
+
+  EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
+  EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
+  EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
+
+  // The first ordered message expire, and the following two are delivered.
+  IForwardTsnChunk::SkippedStream skipped[] = {
+      IForwardTsnChunk::SkippedStream(IsUnordered(false), StreamID(1), MID(0))};
+  EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u);
+}
+
+}  // namespace
+}  // namespace dcsctp
diff --git a/net/dcsctp/rx/reassembly_queue.cc b/net/dcsctp/rx/reassembly_queue.cc
index cbf198b..e0c47f7 100644
--- a/net/dcsctp/rx/reassembly_queue.cc
+++ b/net/dcsctp/rx/reassembly_queue.cc
@@ -29,15 +29,32 @@
 #include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
 #include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
 #include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
 #include "net/dcsctp/rx/reassembly_streams.h"
 #include "net/dcsctp/rx/traditional_reassembly_streams.h"
 #include "rtc_base/logging.h"
 
 namespace dcsctp {
+namespace {
+std::unique_ptr<ReassemblyStreams> CreateStreams(
+    absl::string_view log_prefix,
+    ReassemblyStreams::OnAssembledMessage on_assembled_message,
+    bool use_message_interleaving,
+    const DcSctpSocketHandoverState* handover_state) {
+  if (use_message_interleaving) {
+    return std::make_unique<InterleavedReassemblyStreams>(
+        log_prefix, std::move(on_assembled_message), handover_state);
+  }
+  return std::make_unique<TraditionalReassemblyStreams>(
+      log_prefix, std::move(on_assembled_message), handover_state);
+}
+}  // namespace
+
 ReassemblyQueue::ReassemblyQueue(
     absl::string_view log_prefix,
     TSN peer_initial_tsn,
     size_t max_size_bytes,
+    bool use_message_interleaving,
     const DcSctpSocketHandoverState* handover_state)
     : log_prefix_(std::string(log_prefix) + "reasm: "),
       max_size_bytes_(max_size_bytes),
@@ -50,12 +67,13 @@
               ? ReconfigRequestSN(
                     handover_state->rx.last_completed_deferred_reset_req_sn)
               : ReconfigRequestSN(0)),
-      streams_(std::make_unique<TraditionalReassemblyStreams>(
+      streams_(CreateStreams(
           log_prefix_,
           [this](rtc::ArrayView<const UnwrappedTSN> tsns,
                  DcSctpMessage message) {
             AddReassembledMessage(tsns, std::move(message));
           },
+          use_message_interleaving,
           handover_state)) {}
 
 void ReassemblyQueue::Add(TSN tsn, Data data) {
diff --git a/net/dcsctp/rx/reassembly_queue.h b/net/dcsctp/rx/reassembly_queue.h
index 9cc0c61..ab5dd5e 100644
--- a/net/dcsctp/rx/reassembly_queue.h
+++ b/net/dcsctp/rx/reassembly_queue.h
@@ -72,6 +72,7 @@
   ReassemblyQueue(absl::string_view log_prefix,
                   TSN peer_initial_tsn,
                   size_t max_size_bytes,
+                  bool use_message_interleaving = false,
                   const DcSctpSocketHandoverState* handover_state = nullptr);
 
   // Adds a data chunk to the queue, with a `tsn` and other parameters in
diff --git a/net/dcsctp/rx/reassembly_queue_test.cc b/net/dcsctp/rx/reassembly_queue_test.cc
index bc1b776..cac469f 100644
--- a/net/dcsctp/rx/reassembly_queue_test.cc
+++ b/net/dcsctp/rx/reassembly_queue_test.cc
@@ -33,6 +33,7 @@
 namespace {
 using ::testing::ElementsAre;
 using ::testing::SizeIs;
+using ::testing::UnorderedElementsAre;
 
 // The default maximum size of the Reassembly Queue.
 static constexpr size_t kBufferSize = 10000;
@@ -45,6 +46,11 @@
 
 static constexpr std::array<uint8_t, 4> kShortPayload = {1, 2, 3, 4};
 static constexpr std::array<uint8_t, 4> kMessage2Payload = {5, 6, 7, 8};
+static constexpr std::array<uint8_t, 6> kSixBytePayload = {1, 2, 3, 4, 5, 6};
+static constexpr std::array<uint8_t, 8> kMediumPayload1 = {1, 2, 3, 4,
+                                                           5, 6, 7, 8};
+static constexpr std::array<uint8_t, 8> kMediumPayload2 = {9,  10, 11, 12,
+                                                           13, 14, 15, 16};
 static constexpr std::array<uint8_t, 16> kLongPayload = {
     1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
 
@@ -369,7 +375,8 @@
   DcSctpSocketHandoverState state;
   reasm1.AddHandoverState(state);
   g_handover_state_transformer_for_test(&state);
-  ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, &state);
+  ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize,
+                         /*use_message_interleaving=*/false, &state);
 
   reasm2.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE"));
   EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1));
@@ -384,7 +391,8 @@
   DcSctpSocketHandoverState state;
   reasm1.AddHandoverState(state);
   g_handover_state_transformer_for_test(&state);
-  ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, &state);
+  ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize,
+                         /*use_message_interleaving=*/false, &state);
 
   reasm2.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE"));
   EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1));
@@ -405,5 +413,95 @@
   // Don't assemble SSN=7, as that TSN is skipped.
   EXPECT_FALSE(reasm.HasMessages());
 }
+
+TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessageInRfc8260) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize,
+                        /*use_message_interleaving=*/true);
+  reasm.Add(TSN(10), Data(StreamID(1), SSN(0), MID(0), FSN(0), kPPID,
+                          {1, 2, 3, 4}, Data::IsBeginning(true),
+                          Data::IsEnd(true), IsUnordered(true)));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+  EXPECT_TRUE(reasm.HasMessages());
+  EXPECT_THAT(reasm.FlushMessages(),
+              ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
+}
+
+TEST_F(ReassemblyQueueTest, TwoInterleavedChunks) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize,
+                        /*use_message_interleaving=*/true);
+  reasm.Add(TSN(10), Data(StreamID(1), SSN(0), MID(0), FSN(0), kPPID,
+                          {1, 2, 3, 4}, Data::IsBeginning(true),
+                          Data::IsEnd(false), IsUnordered(true)));
+  reasm.Add(TSN(11), Data(StreamID(2), SSN(0), MID(0), FSN(0), kPPID,
+                          {9, 10, 11, 12}, Data::IsBeginning(true),
+                          Data::IsEnd(false), IsUnordered(true)));
+  EXPECT_EQ(reasm.queued_bytes(), 8u);
+  reasm.Add(TSN(12), Data(StreamID(1), SSN(0), MID(0), FSN(1), kPPID,
+                          {5, 6, 7, 8}, Data::IsBeginning(false),
+                          Data::IsEnd(true), IsUnordered(true)));
+  EXPECT_EQ(reasm.queued_bytes(), 4u);
+  reasm.Add(TSN(13), Data(StreamID(2), SSN(0), MID(0), FSN(1), kPPID,
+                          {13, 14, 15, 16}, Data::IsBeginning(false),
+                          Data::IsEnd(true), IsUnordered(true)));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+  EXPECT_TRUE(reasm.HasMessages());
+  EXPECT_THAT(reasm.FlushMessages(),
+              ElementsAre(SctpMessageIs(StreamID(1), kPPID, kMediumPayload1),
+                          SctpMessageIs(StreamID(2), kPPID, kMediumPayload2)));
+}
+
+TEST_F(ReassemblyQueueTest, UnorderedInterleavedMessagesAllPermutations) {
+  std::vector<int> indexes = {0, 1, 2, 3, 4, 5};
+  TSN tsns[] = {TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), TSN(15)};
+  StreamID stream_ids[] = {StreamID(1), StreamID(2), StreamID(1),
+                           StreamID(1), StreamID(2), StreamID(2)};
+  FSN fsns[] = {FSN(0), FSN(0), FSN(1), FSN(2), FSN(1), FSN(2)};
+  rtc::ArrayView<const uint8_t> payload(kSixBytePayload);
+  do {
+    ReassemblyQueue reasm("log: ", TSN(10), kBufferSize,
+                          /*use_message_interleaving=*/true);
+    for (int i : indexes) {
+      auto span = payload.subview(*fsns[i] * 2, 2);
+      Data::IsBeginning is_beginning(fsns[i] == FSN(0));
+      Data::IsEnd is_end(fsns[i] == FSN(2));
+      reasm.Add(tsns[i], Data(stream_ids[i], SSN(0), MID(0), fsns[i], kPPID,
+                              std::vector<uint8_t>(span.begin(), span.end()),
+                              is_beginning, is_end, IsUnordered(true)));
+    }
+    EXPECT_TRUE(reasm.HasMessages());
+    EXPECT_THAT(reasm.FlushMessages(),
+                UnorderedElementsAre(
+                    SctpMessageIs(StreamID(1), kPPID, kSixBytePayload),
+                    SctpMessageIs(StreamID(2), kPPID, kSixBytePayload)));
+    EXPECT_EQ(reasm.queued_bytes(), 0u);
+  } while (std::next_permutation(std::begin(indexes), std::end(indexes)));
+}
+
+TEST_F(ReassemblyQueueTest, IForwardTSNRemoveALotOrdered) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize,
+                        /*use_message_interleaving=*/true);
+  reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
+  gen_.Ordered({2}, "");
+  reasm.Add(TSN(12), gen_.Ordered({3}, ""));
+  reasm.Add(TSN(13), gen_.Ordered({4}, "E"));
+  reasm.Add(TSN(15), gen_.Ordered({5}, "B"));
+  reasm.Add(TSN(16), gen_.Ordered({6}, ""));
+  reasm.Add(TSN(17), gen_.Ordered({7}, ""));
+  reasm.Add(TSN(18), gen_.Ordered({8}, "E"));
+
+  ASSERT_FALSE(reasm.HasMessages());
+  EXPECT_EQ(reasm.queued_bytes(), 7u);
+
+  reasm.Handle(
+      IForwardTsnChunk(TSN(13), {IForwardTsnChunk::SkippedStream(
+                                    IsUnordered(false), kStreamID, MID(0))}));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+
+  // The lost chunk comes, but too late.
+  ASSERT_TRUE(reasm.HasMessages());
+  EXPECT_THAT(reasm.FlushMessages(),
+              ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload)));
+}
+
 }  // namespace
 }  // namespace dcsctp
diff --git a/net/dcsctp/rx/reassembly_streams.cc b/net/dcsctp/rx/reassembly_streams.cc
new file mode 100644
index 0000000..9fd52fb
--- /dev/null
+++ b/net/dcsctp/rx/reassembly_streams.cc
@@ -0,0 +1,55 @@
+/*
+ *  Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/reassembly_streams.h"
+
+#include <cstddef>
+#include <map>
+#include <utility>
+
+namespace dcsctp {
+
+ReassembledMessage AssembleMessage(std::map<UnwrappedTSN, Data>::iterator start,
+                                   std::map<UnwrappedTSN, Data>::iterator end) {
+  size_t count = std::distance(start, end);
+
+  if (count == 1) {
+    // Fast path - zero-copy
+    Data& data = start->second;
+
+    return ReassembledMessage{
+        .tsns = {start->first},
+        .message = DcSctpMessage(data.stream_id, data.ppid,
+                                 std::move(start->second.payload)),
+    };
+  }
+
+  // Slow path - will need to concatenate the payload.
+  std::vector<UnwrappedTSN> tsns;
+  std::vector<uint8_t> payload;
+
+  size_t payload_size = std::accumulate(
+      start, end, 0,
+      [](size_t v, const auto& p) { return v + p.second.size(); });
+
+  tsns.reserve(count);
+  payload.reserve(payload_size);
+  for (auto it = start; it != end; ++it) {
+    Data& data = it->second;
+    tsns.push_back(it->first);
+    payload.insert(payload.end(), data.payload.begin(), data.payload.end());
+  }
+
+  return ReassembledMessage{
+      .tsns = std::move(tsns),
+      .message = DcSctpMessage(start->second.stream_id, start->second.ppid,
+                               std::move(payload)),
+  };
+}
+}  // namespace dcsctp
diff --git a/net/dcsctp/socket/transmission_control_block.h b/net/dcsctp/socket/transmission_control_block.h
index 41a79ea..038ad36 100644
--- a/net/dcsctp/socket/transmission_control_block.h
+++ b/net/dcsctp/socket/transmission_control_block.h
@@ -98,6 +98,7 @@
         reassembly_queue_(log_prefix,
                           peer_initial_tsn,
                           options.max_receiver_window_buffer_size,
+                          capabilities.message_interleaving,
                           handover_state),
         retransmission_queue_(
             log_prefix,