dcsctp: Add Traditional Reassembly Streams

This class handles the assembly of fragmented received messages (as DATA
chunks) and manage per-stream queues. This class only handles
non-interleaved messages as described in RFC4960, and is not used when
message interleaving is enabled on the association, as described in
RFC8260.

This is also only part of the reassembly - a follow-up change will add
the ReassemblyQueue that handle the other part as well. And an even
further follow-up change will add a "interleaved reassembly stream".

Bug: webrtc:12614
Change-Id: Iaf339fa215a2b14926f5cb74f15528392e273f99
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214042
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33677}
diff --git a/net/dcsctp/rx/BUILD.gn b/net/dcsctp/rx/BUILD.gn
index 224c546..6c26e11 100644
--- a/net/dcsctp/rx/BUILD.gn
+++ b/net/dcsctp/rx/BUILD.gn
@@ -21,18 +21,42 @@
   ]
 }
 
+rtc_source_set("reassembly_streams") {
+  deps = [ "../packet:chunk" ]
+  sources = [ "reassembly_streams.h" ]
+}
+
+rtc_library("traditional_reassembly_streams") {
+  deps = [
+    ":reassembly_streams",
+    "../../../api:array_view",
+    "../../../rtc_base",
+    "../../../rtc_base:checks",
+    "../../../rtc_base:rtc_base_approved",
+  ]
+  sources = [
+    "traditional_reassembly_streams.cc",
+    "traditional_reassembly_streams.h",
+  ]
+}
+
 if (rtc_include_tests) {
   rtc_library("dcsctp_rx_unittests") {
     testonly = true
 
     deps = [
       ":data_tracker",
+      ":traditional_reassembly_streams",
       "../../../api:array_view",
       "../../../rtc_base:checks",
       "../../../rtc_base:gunit_helpers",
       "../../../rtc_base:rtc_base_approved",
       "../../../test:test_support",
+      "../testing:data_generator",
     ]
-    sources = [ "data_tracker_test.cc" ]
+    sources = [
+      "data_tracker_test.cc",
+      "traditional_reassembly_streams_test.cc",
+    ]
   }
 }
diff --git a/net/dcsctp/rx/reassembly_streams.h b/net/dcsctp/rx/reassembly_streams.h
new file mode 100644
index 0000000..a8b42b5
--- /dev/null
+++ b/net/dcsctp/rx/reassembly_streams.h
@@ -0,0 +1,84 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_RX_REASSEMBLY_STREAMS_H_
+#define NET_DCSCTP_RX_REASSEMBLY_STREAMS_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <functional>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+
+namespace dcsctp {
+
+// Implementations of this interface will be called when data is received, when
+// data should be skipped/forgotten or when sequence number should be reset.
+//
+// As a result of these operations - mainly when data is received - the
+// implementations of this interface should notify when a message has been
+// assembled, by calling the provided callback of type `OnAssembledMessage`. How
+// it assembles messages will depend on e.g. if a message was sent on an ordered
+// or unordered stream.
+//
+// Implementations will - for each operation - indicate how much additional
+// memory that has been used as a result of performing the operation. This is
+// used to limit the maximum amount of memory used, to prevent out-of-memory
+// situations.
+class ReassemblyStreams {
+ public:
+  // This callback will be provided as an argument to the constructor of the
+  // concrete class implementing this interface and should be called when a
+  // message has been assembled as well as indicating from which TSNs this
+  // message was assembled from.
+  using OnAssembledMessage =
+      std::function<void(rtc::ArrayView<const UnwrappedTSN> tsns,
+                         DcSctpMessage message)>;
+
+  virtual ~ReassemblyStreams() = default;
+
+  // Adds a data chunk to a stream as identified in `data`.
+  // If it was the last remaining chunk in a message, reassemble one (or
+  // several, in case of ordered chunks) messages.
+  //
+  // Returns the additional number of bytes added to the queue as a result of
+  // performing this operation. If this addition resulted in messages being
+  // assembled and delivered, this may be negative.
+  virtual int Add(UnwrappedTSN tsn, Data data) = 0;
+
+  // Called for incoming FORWARD-TSN/I-FORWARD-TSN chunks - when the sender
+  // wishes the received to skip/forget about data up until the provided TSN.
+  // This is used to implement partial reliability, such as limiting the number
+  // of retransmissions or the an expiration duration. As a result of skipping
+  // data, this may result in the implementation being able to assemble messages
+  // in ordered streams.
+  //
+  // Returns the number of bytes removed from the queue as a result of
+  // this operation.
+  virtual size_t HandleForwardTsn(
+      UnwrappedTSN new_cumulative_ack_tsn,
+      rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream>
+          skipped_streams) = 0;
+
+  // Called for incoming (possibly deferred) RE_CONFIG chunks asking for
+  // either a few streams, or all streams (when the list is empty) to be
+  // reset - to have their next SSN or Message ID to be zero.
+  virtual void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) = 0;
+};
+
+}  // namespace dcsctp
+
+#endif  // NET_DCSCTP_RX_REASSEMBLY_STREAMS_H_
diff --git a/net/dcsctp/rx/traditional_reassembly_streams.cc b/net/dcsctp/rx/traditional_reassembly_streams.cc
new file mode 100644
index 0000000..caa97d2
--- /dev/null
+++ b/net/dcsctp/rx/traditional_reassembly_streams.cc
@@ -0,0 +1,289 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/traditional_reassembly_streams.h"
+
+#include <stddef.h>
+
+#include <cstdint>
+#include <functional>
+#include <iterator>
+#include <map>
+#include <numeric>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "rtc_base/logging.h"
+
+namespace dcsctp {
+namespace {
+
+// Given a map (`chunks`) and an iterator to within that map (`iter`), this
+// function will return an iterator to the first chunk in that message, which
+// has the `is_beginning` flag set. If there are any gaps, or if the beginning
+// can't be found, `absl::nullopt` is returned.
+absl::optional<std::map<UnwrappedTSN, Data>::iterator> FindBeginning(
+    const std::map<UnwrappedTSN, Data>& chunks,
+    std::map<UnwrappedTSN, Data>::iterator iter) {
+  UnwrappedTSN prev_tsn = iter->first;
+  for (;;) {
+    if (iter->second.is_beginning) {
+      return iter;
+    }
+    if (iter == chunks.begin()) {
+      return absl::nullopt;
+    }
+    --iter;
+    if (iter->first.next_value() != prev_tsn) {
+      return absl::nullopt;
+    }
+    prev_tsn = iter->first;
+  }
+}
+
+// Given a map (`chunks`) and an iterator to within that map (`iter`), this
+// function will return an iterator to the chunk after the last chunk in that
+// message, which has the `is_end` flag set. If there are any gaps, or if the
+// end can't be found, `absl::nullopt` is returned.
+absl::optional<std::map<UnwrappedTSN, Data>::iterator> FindEnd(
+    std::map<UnwrappedTSN, Data>& chunks,
+    std::map<UnwrappedTSN, Data>::iterator iter) {
+  UnwrappedTSN prev_tsn = iter->first;
+  for (;;) {
+    if (iter->second.is_end) {
+      return ++iter;
+    }
+    ++iter;
+    if (iter == chunks.end()) {
+      return absl::nullopt;
+    }
+    if (iter->first != prev_tsn.next_value()) {
+      return absl::nullopt;
+    }
+    prev_tsn = iter->first;
+  }
+}
+}  // namespace
+
+int TraditionalReassemblyStreams::UnorderedStream::Add(UnwrappedTSN tsn,
+                                                       Data data) {
+  int queued_bytes = data.size();
+  auto p = chunks_.emplace(tsn, std::move(data));
+  if (!p.second /* !inserted */) {
+    return 0;
+  }
+
+  queued_bytes -= TryToAssembleMessage(p.first);
+
+  return queued_bytes;
+}
+
+size_t TraditionalReassemblyStreams::UnorderedStream::TryToAssembleMessage(
+    ChunkMap::iterator iter) {
+  // TODO(boivie): This method is O(N) with the number of fragments in a
+  // message, which can be inefficient for very large values of N. This could be
+  // optimized by e.g. only trying to assemble a message once _any_ beginning
+  // and _any_ end has been found.
+  absl::optional<ChunkMap::iterator> start = FindBeginning(chunks_, iter);
+  if (!start.has_value()) {
+    return 0;
+  }
+  absl::optional<ChunkMap::iterator> end = FindEnd(chunks_, iter);
+  if (!end.has_value()) {
+    return 0;
+  }
+
+  size_t bytes_assembled = AssembleMessage(*start, *end);
+  chunks_.erase(*start, *end);
+  return bytes_assembled;
+}
+
+size_t TraditionalReassemblyStreams::StreamBase::AssembleMessage(
+    const ChunkMap::iterator start,
+    const ChunkMap::iterator end) {
+  size_t count = std::distance(start, end);
+
+  if (count == 1) {
+    // Fast path - zero-copy
+    const Data& data = start->second;
+    size_t payload_size = start->second.size();
+    UnwrappedTSN tsns[1] = {start->first};
+    DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
+    parent_.on_assembled_message_(tsns, std::move(message));
+    return payload_size;
+  }
+
+  // Slow path - will need to concatenate the payload.
+  std::vector<UnwrappedTSN> tsns;
+  std::vector<uint8_t> payload;
+
+  size_t payload_size = std::accumulate(
+      start, end, 0,
+      [](size_t v, const auto& p) { return v + p.second.size(); });
+
+  tsns.reserve(count);
+  payload.reserve(payload_size);
+  for (auto it = start; it != end; ++it) {
+    const Data& data = it->second;
+    tsns.push_back(it->first);
+    payload.insert(payload.end(), data.payload.begin(), data.payload.end());
+  }
+
+  DcSctpMessage message(start->second.stream_id, start->second.ppid,
+                        std::move(payload));
+  parent_.on_assembled_message_(tsns, std::move(message));
+
+  return payload_size;
+}
+
+size_t TraditionalReassemblyStreams::UnorderedStream::EraseTo(
+    UnwrappedTSN tsn) {
+  auto end_iter = chunks_.upper_bound(tsn);
+  size_t removed_bytes = std::accumulate(
+      chunks_.begin(), end_iter, 0,
+      [](size_t r, const auto& p) { return r + p.second.size(); });
+
+  chunks_.erase(chunks_.begin(), end_iter);
+  return removed_bytes;
+}
+
+size_t TraditionalReassemblyStreams::OrderedStream::TryToAssembleMessage() {
+  if (chunks_by_ssn_.empty() || chunks_by_ssn_.begin()->first != next_ssn_) {
+    return 0;
+  }
+
+  ChunkMap& chunks = chunks_by_ssn_.begin()->second;
+
+  if (!chunks.begin()->second.is_beginning || !chunks.rbegin()->second.is_end) {
+    return 0;
+  }
+
+  uint32_t tsn_diff = chunks.rbegin()->first.Difference(chunks.begin()->first);
+  if (tsn_diff != chunks.size() - 1) {
+    return 0;
+  }
+
+  size_t assembled_bytes = AssembleMessage(chunks.begin(), chunks.end());
+  chunks_by_ssn_.erase(chunks_by_ssn_.begin());
+  next_ssn_.Increment();
+  return assembled_bytes;
+}
+
+size_t TraditionalReassemblyStreams::OrderedStream::TryToAssembleMessages() {
+  size_t assembled_bytes = 0;
+
+  for (;;) {
+    size_t assembled_bytes_this_iter = TryToAssembleMessage();
+    if (assembled_bytes_this_iter == 0) {
+      break;
+    }
+    assembled_bytes += assembled_bytes_this_iter;
+  }
+  return assembled_bytes;
+}
+
+int TraditionalReassemblyStreams::OrderedStream::Add(UnwrappedTSN tsn,
+                                                     Data data) {
+  int queued_bytes = data.size();
+
+  UnwrappedSSN ssn = ssn_unwrapper_.Unwrap(data.ssn);
+  auto p = chunks_by_ssn_[ssn].emplace(tsn, std::move(data));
+  if (!p.second /* !inserted */) {
+    return 0;
+  }
+
+  if (ssn == next_ssn_) {
+    queued_bytes -= TryToAssembleMessages();
+  }
+
+  return queued_bytes;
+}
+
+size_t TraditionalReassemblyStreams::OrderedStream::EraseTo(SSN ssn) {
+  UnwrappedSSN unwrapped_ssn = ssn_unwrapper_.Unwrap(ssn);
+
+  auto end_iter = chunks_by_ssn_.upper_bound(unwrapped_ssn);
+  size_t removed_bytes = std::accumulate(
+      chunks_by_ssn_.begin(), end_iter, 0, [](size_t r1, const auto& p) {
+        return r1 +
+               absl::c_accumulate(p.second, 0, [](size_t r2, const auto& q) {
+                 return r2 + q.second.size();
+               });
+      });
+  chunks_by_ssn_.erase(chunks_by_ssn_.begin(), end_iter);
+
+  if (unwrapped_ssn >= next_ssn_) {
+    unwrapped_ssn.Increment();
+    next_ssn_ = unwrapped_ssn;
+  }
+
+  removed_bytes += TryToAssembleMessages();
+  return removed_bytes;
+}
+
+int TraditionalReassemblyStreams::Add(UnwrappedTSN tsn, Data data) {
+  if (data.is_unordered) {
+    auto it = unordered_streams_.emplace(data.stream_id, this).first;
+    return it->second.Add(tsn, std::move(data));
+  }
+
+  auto it = ordered_streams_.emplace(data.stream_id, this).first;
+  return it->second.Add(tsn, std::move(data));
+}
+
+size_t TraditionalReassemblyStreams::HandleForwardTsn(
+    UnwrappedTSN new_cumulative_ack_tsn,
+    rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
+  size_t bytes_removed = 0;
+  // The `skipped_streams` only over ordered messages - need to
+  // iterate all unordered streams manually to remove those chunks.
+  for (auto& entry : unordered_streams_) {
+    bytes_removed += entry.second.EraseTo(new_cumulative_ack_tsn);
+  }
+
+  for (const auto& skipped_stream : skipped_streams) {
+    auto it = ordered_streams_.find(skipped_stream.stream_id);
+    if (it != ordered_streams_.end()) {
+      bytes_removed += it->second.EraseTo(skipped_stream.ssn);
+    }
+  }
+
+  return bytes_removed;
+}
+
+void TraditionalReassemblyStreams::ResetStreams(
+    rtc::ArrayView<const StreamID> stream_ids) {
+  if (stream_ids.empty()) {
+    for (auto& entry : ordered_streams_) {
+      const StreamID& stream_id = entry.first;
+      OrderedStream& stream = entry.second;
+      RTC_DLOG(LS_VERBOSE) << log_prefix_
+                           << "Resetting implicit stream_id=" << *stream_id;
+      stream.Reset();
+    }
+  } else {
+    for (StreamID stream_id : stream_ids) {
+      auto it = ordered_streams_.find(stream_id);
+      if (it != ordered_streams_.end()) {
+        RTC_DLOG(LS_VERBOSE)
+            << log_prefix_ << "Resetting explicit stream_id=" << *stream_id;
+        it->second.Reset();
+      }
+    }
+  }
+}
+}  // namespace dcsctp
diff --git a/net/dcsctp/rx/traditional_reassembly_streams.h b/net/dcsctp/rx/traditional_reassembly_streams.h
new file mode 100644
index 0000000..12d1d933
--- /dev/null
+++ b/net/dcsctp/rx/traditional_reassembly_streams.h
@@ -0,0 +1,119 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_RX_TRADITIONAL_REASSEMBLY_STREAMS_H_
+#define NET_DCSCTP_RX_TRADITIONAL_REASSEMBLY_STREAMS_H_
+#include <stddef.h>
+#include <stdint.h>
+
+#include <map>
+#include <string>
+#include <unordered_map>
+
+#include "absl/strings/string_view.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/rx/reassembly_streams.h"
+
+namespace dcsctp {
+
+// Handles reassembly of incoming data when interleaved message sending
+// is not enabled on the association, i.e. when RFC8260 is not in use and
+// RFC4960 is to be followed.
+class TraditionalReassemblyStreams : public ReassemblyStreams {
+ public:
+  TraditionalReassemblyStreams(absl::string_view log_prefix,
+                               OnAssembledMessage on_assembled_message)
+      : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {}
+
+  int Add(UnwrappedTSN tsn, Data data) override;
+
+  size_t HandleForwardTsn(
+      UnwrappedTSN new_cumulative_ack_tsn,
+      rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams)
+      override;
+
+  void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) override;
+
+ private:
+  using ChunkMap = std::map<UnwrappedTSN, Data>;
+
+  // Base class for `UnorderedStream` and `OrderedStream`.
+  class StreamBase {
+   protected:
+    explicit StreamBase(TraditionalReassemblyStreams* parent)
+        : parent_(*parent) {}
+
+    size_t AssembleMessage(const ChunkMap::iterator start,
+                           const ChunkMap::iterator end);
+    TraditionalReassemblyStreams& parent_;
+  };
+
+  // Manages all received data for a specific unordered stream, and assembles
+  // messages when possible.
+  class UnorderedStream : StreamBase {
+   public:
+    explicit UnorderedStream(TraditionalReassemblyStreams* parent)
+        : StreamBase(parent) {}
+    int Add(UnwrappedTSN tsn, Data data);
+    // Returns the number of bytes removed from the queue.
+    size_t EraseTo(UnwrappedTSN tsn);
+
+   private:
+    // Given an iterator to any chunk within the map, try to assemble a message
+    // into `reassembled_messages` containing it and - if successful - erase
+    // those chunks from the stream chunks map.
+    //
+    // Returns the number of bytes that were assembled.
+    size_t TryToAssembleMessage(ChunkMap::iterator iter);
+
+    ChunkMap chunks_;
+  };
+
+  // Manages all received data for a specific ordered stream, and assembles
+  // messages when possible.
+  class OrderedStream : StreamBase {
+   public:
+    explicit OrderedStream(TraditionalReassemblyStreams* parent)
+        : StreamBase(parent), next_ssn_(ssn_unwrapper_.Unwrap(SSN(0))) {}
+    int Add(UnwrappedTSN tsn, Data data);
+    size_t EraseTo(SSN ssn);
+    void Reset() {
+      ssn_unwrapper_.Reset();
+      next_ssn_ = ssn_unwrapper_.Unwrap(SSN(0));
+    }
+
+   private:
+    // Try to assemble one or several messages in order from the stream.
+    // Returns the number of bytes assembled if a message was assembled.
+    size_t TryToAssembleMessage();
+    size_t TryToAssembleMessages();
+    // This must be an ordered container to be able to iterate in SSN order.
+    std::map<UnwrappedSSN, ChunkMap> chunks_by_ssn_;
+    UnwrappedSSN::Unwrapper ssn_unwrapper_;
+    UnwrappedSSN next_ssn_;
+  };
+
+  const std::string log_prefix_;
+
+  // Callback for when a message has been assembled.
+  const OnAssembledMessage on_assembled_message_;
+
+  // All unordered and ordered streams, managing not-yet-assembled data.
+  std::unordered_map<StreamID, UnorderedStream, StreamID::Hasher>
+      unordered_streams_;
+  std::unordered_map<StreamID, OrderedStream, StreamID::Hasher>
+      ordered_streams_;
+};
+
+}  // namespace dcsctp
+
+#endif  // NET_DCSCTP_RX_TRADITIONAL_REASSEMBLY_STREAMS_H_
diff --git a/net/dcsctp/rx/traditional_reassembly_streams_test.cc b/net/dcsctp/rx/traditional_reassembly_streams_test.cc
new file mode 100644
index 0000000..30d29a0
--- /dev/null
+++ b/net/dcsctp/rx/traditional_reassembly_streams_test.cc
@@ -0,0 +1,152 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/traditional_reassembly_streams.h"
+
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/rx/reassembly_streams.h"
+#include "net/dcsctp/testing/data_generator.h"
+#include "rtc_base/gunit.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+namespace {
+using ::testing::MockFunction;
+using ::testing::NiceMock;
+
+class TraditionalReassemblyStreamsTest : public testing::Test {
+ protected:
+  UnwrappedTSN tsn(uint32_t value) { return tsn_.Unwrap(TSN(value)); }
+
+  TraditionalReassemblyStreamsTest() {}
+  DataGenerator gen_;
+  UnwrappedTSN::Unwrapper tsn_;
+};
+
+TEST_F(TraditionalReassemblyStreamsTest,
+       AddUnorderedMessageReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
+  EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2);
+  // Adding the end fragment should make it empty again.
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({7}, "E")), -6);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+       AddSimpleOrderedMessageReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+  EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), -6);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+       AddMoreComplexOrderedMessageReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+  Data late = gen_.Ordered({2, 3, 4});
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
+
+  EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
+  EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
+  EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
+  EXPECT_EQ(streams.Add(tsn(2), std::move(late)), -8);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+       DeleteUnorderedMessageReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
+  EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2);
+
+  EXPECT_EQ(streams.HandleForwardTsn(tsn(3), {}), 6u);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+       DeleteSimpleOrderedMessageReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+  EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+
+  ForwardTsnChunk::SkippedStream skipped[] = {
+      ForwardTsnChunk::SkippedStream(StreamID(1), SSN(0))};
+  EXPECT_EQ(streams.HandleForwardTsn(tsn(3), skipped), 6u);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+       DeleteManyOrderedMessagesReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+  gen_.Ordered({2, 3, 4});
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
+
+  EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
+  EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
+  EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
+
+  // Expire all three messages
+  ForwardTsnChunk::SkippedStream skipped[] = {
+      ForwardTsnChunk::SkippedStream(StreamID(1), SSN(2))};
+  EXPECT_EQ(streams.HandleForwardTsn(tsn(8), skipped), 8u);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+       DeleteOrderedMessageDelivesTwoReturnsCorrectSize) {
+  NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+  TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+  EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+  gen_.Ordered({2, 3, 4});
+  EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+  EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
+
+  EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
+  EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
+  EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
+
+  // The first ordered message expire, and the following two are delivered.
+  ForwardTsnChunk::SkippedStream skipped[] = {
+      ForwardTsnChunk::SkippedStream(StreamID(1), SSN(0))};
+  EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u);
+}
+
+}  // namespace
+}  // namespace dcsctp