dcsctp: Add Reassembly Queue

The Reassembly Queue receives fragmented messages (DATA or I-DATA
chunks) and - with help of stream reassemblers - will reassemble these
fragments into messages, which will be delivered to the client.

It also handle partial reliability (FORWARD-TSN) and stream resetting.

To avoid a DoS attack vector, where a sender can send fragments in a way
that the reassembly queue will never succeed to reassemble a message and
use all available memory, the ReassemblyQueue has a maximum size.

Bug: webrtc:12614
Change-Id: Ibb084fecd240d4c414e096579244f8f5ee46914e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214043
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33678}
diff --git a/net/dcsctp/rx/BUILD.gn b/net/dcsctp/rx/BUILD.gn
index 6c26e11..75312b9 100644
--- a/net/dcsctp/rx/BUILD.gn
+++ b/net/dcsctp/rx/BUILD.gn
@@ -40,12 +40,27 @@
   ]
 }
 
+rtc_library("reassembly_queue") {
+  deps = [
+    ":traditional_reassembly_streams",
+    "../../../api:array_view",
+    "../../../rtc_base",
+    "../../../rtc_base:checks",
+    "../../../rtc_base:rtc_base_approved",
+  ]
+  sources = [
+    "reassembly_queue.cc",
+    "reassembly_queue.h",
+  ]
+}
+
 if (rtc_include_tests) {
   rtc_library("dcsctp_rx_unittests") {
     testonly = true
 
     deps = [
       ":data_tracker",
+      ":reassembly_queue",
       ":traditional_reassembly_streams",
       "../../../api:array_view",
       "../../../rtc_base:checks",
@@ -56,6 +71,7 @@
     ]
     sources = [
       "data_tracker_test.cc",
+      "reassembly_queue_test.cc",
       "traditional_reassembly_streams_test.cc",
     ]
   }
diff --git a/net/dcsctp/rx/reassembly_queue.cc b/net/dcsctp/rx/reassembly_queue.cc
new file mode 100644
index 0000000..581b9fc
--- /dev/null
+++ b/net/dcsctp/rx/reassembly_queue.cc
@@ -0,0 +1,245 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/reassembly_queue.h"
+
+#include <stddef.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/common/str_join.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
+#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/rx/reassembly_streams.h"
+#include "net/dcsctp/rx/traditional_reassembly_streams.h"
+#include "rtc_base/logging.h"
+
+namespace dcsctp {
+ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
+                                 TSN peer_initial_tsn,
+                                 size_t max_size_bytes)
+    : log_prefix_(std::string(log_prefix) + "reasm: "),
+      max_size_bytes_(max_size_bytes),
+      watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
+      last_assembled_tsn_watermark_(
+          tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
+      streams_(std::make_unique<TraditionalReassemblyStreams>(
+          log_prefix_,
+          [this](rtc::ArrayView<const UnwrappedTSN> tsns,
+                 DcSctpMessage message) {
+            AddReassembledMessage(tsns, std::move(message));
+          })) {}
+
+void ReassemblyQueue::Add(TSN tsn, Data data) {
+  RTC_DCHECK(IsConsistent());
+  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
+                       << ", stream=" << *data.stream_id << ":"
+                       << *data.message_id << ":" << *data.fsn << ", type="
+                       << (data.is_beginning && data.is_end
+                               ? "complete"
+                               : data.is_beginning
+                                     ? "first"
+                                     : data.is_end ? "last" : "middle");
+
+  UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);
+
+  if (unwrapped_tsn <= last_assembled_tsn_watermark_ ||
+      delivered_tsns_.find(unwrapped_tsn) != delivered_tsns_.end()) {
+    RTC_DLOG(LS_VERBOSE) << log_prefix_
+                         << "Chunk has already been delivered - skipping";
+    return;
+  }
+
+  // If a stream reset has been received with a "sender's last assigned tsn" in
+  // the future, the socket is in "deferred reset processing" mode and must
+  // buffer chunks until it's exited.
+  if (deferred_reset_streams_.has_value() &&
+      unwrapped_tsn >
+          tsn_unwrapper_.Unwrap(
+              deferred_reset_streams_->req.sender_last_assigned_tsn())) {
+    RTC_DLOG(LS_VERBOSE)
+        << log_prefix_ << "Deferring chunk with tsn=" << *tsn
+        << " until cum_ack_tsn="
+        << *deferred_reset_streams_->req.sender_last_assigned_tsn();
+    // https://tools.ietf.org/html/rfc6525#section-5.2.2
+    // "In this mode, any data arriving with a TSN larger than the
+    // Sender's Last Assigned TSN for the affected stream(s) MUST be queued
+    // locally and held until the cumulative acknowledgment point reaches the
+    // Sender's Last Assigned TSN."
+    queued_bytes_ += data.size();
+    deferred_reset_streams_->deferred_chunks.emplace_back(
+        std::make_pair(tsn, std::move(data)));
+  } else {
+    queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
+  }
+
+  // https://tools.ietf.org/html/rfc4960#section-6.9
+  // "Note: If the data receiver runs out of buffer space while still
+  // waiting for more fragments to complete the reassembly of the message, it
+  // should dispatch part of its inbound message through a partial delivery
+  // API (see Section 10), freeing some of its receive buffer space so that
+  // the rest of the message may be received."
+
+  // TODO(boivie): Support EOR flag and partial delivery?
+  RTC_DCHECK(IsConsistent());
+}
+
+ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams(
+    const OutgoingSSNResetRequestParameter& req,
+    TSN cum_tsn_ack) {
+  RTC_DCHECK(IsConsistent());
+  if (deferred_reset_streams_.has_value()) {
+    // In deferred mode already.
+    return ReconfigurationResponseParameter::Result::kInProgress;
+  } else if (req.request_sequence_number() <=
+             last_completed_reset_req_seq_nbr_) {
+    // Already performed at some time previously.
+    return ReconfigurationResponseParameter::Result::kSuccessPerformed;
+  }
+
+  UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn());
+  UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack);
+
+  // https://tools.ietf.org/html/rfc6525#section-5.2.2
+  // "If the Sender's Last Assigned TSN is greater than the
+  // cumulative acknowledgment point, then the endpoint MUST enter "deferred
+  // reset processing"."
+  if (sla_tsn > unwrapped_cum_tsn_ack) {
+    RTC_DLOG(LS_VERBOSE)
+        << log_prefix_
+        << "Entering deferred reset processing mode until cum_tsn_ack="
+        << *req.sender_last_assigned_tsn();
+    deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req);
+    return ReconfigurationResponseParameter::Result::kInProgress;
+  }
+
+  // https://tools.ietf.org/html/rfc6525#section-5.2.2
+  // "... streams MUST be reset to 0 as the next expected SSN."
+  streams_->ResetStreams(req.stream_ids());
+  last_completed_reset_req_seq_nbr_ = req.request_sequence_number();
+  RTC_DCHECK(IsConsistent());
+  return ReconfigurationResponseParameter::Result::kSuccessPerformed;
+}
+
+bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) {
+  RTC_DCHECK(IsConsistent());
+  if (deferred_reset_streams_.has_value()) {
+    UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn);
+    UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap(
+        deferred_reset_streams_->req.sender_last_assigned_tsn());
+    if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) {
+      RTC_DLOG(LS_VERBOSE) << log_prefix_
+                           << "Leaving deferred reset processing with tsn="
+                           << *cum_ack_tsn << ", feeding back "
+                           << deferred_reset_streams_->deferred_chunks.size()
+                           << " chunks";
+      // https://tools.ietf.org/html/rfc6525#section-5.2.2
+      // "... streams MUST be reset to 0 as the next expected SSN."
+      streams_->ResetStreams(deferred_reset_streams_->req.stream_ids());
+      std::vector<std::pair<TSN, Data>> deferred_chunks =
+          std::move(deferred_reset_streams_->deferred_chunks);
+      // The response will not be sent now, but as a reply to the retried
+      // request, which will come as "in progress" has been sent prior.
+      last_completed_reset_req_seq_nbr_ =
+          deferred_reset_streams_->req.request_sequence_number();
+      deferred_reset_streams_ = absl::nullopt;
+
+      // https://tools.ietf.org/html/rfc6525#section-5.2.2
+      // "Any queued TSNs (queued at step E2) MUST now be released and processed
+      // normally."
+      for (auto& p : deferred_chunks) {
+        const TSN& tsn = p.first;
+        Data& data = p.second;
+        queued_bytes_ -= data.size();
+        Add(tsn, std::move(data));
+      }
+
+      RTC_DCHECK(IsConsistent());
+      return true;
+    } else {
+      RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn="
+                           << *cum_ack_tsn;
+    }
+  }
+
+  return false;
+}
+
+std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
+  std::vector<DcSctpMessage> ret;
+  reassembled_messages_.swap(ret);
+  return ret;
+}
+
+void ReassemblyQueue::AddReassembledMessage(
+    rtc::ArrayView<const UnwrappedTSN> tsns,
+    DcSctpMessage message) {
+  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=["
+                       << StrJoin(tsns, ",",
+                                  [](rtc::StringBuilder& sb, UnwrappedTSN tsn) {
+                                    sb << *tsn.Wrap();
+                                  })
+                       << "], message; stream_id=" << *message.stream_id()
+                       << ", ppid=" << *message.ppid()
+                       << ", payload=" << message.payload().size() << " bytes";
+
+  for (const UnwrappedTSN tsn : tsns) {
+    // Update watermark, or insert into delivered_tsns_
+    if (tsn == last_assembled_tsn_watermark_.next_value()) {
+      last_assembled_tsn_watermark_.Increment();
+    } else {
+      delivered_tsns_.insert(tsn);
+    }
+  }
+
+  // With new TSNs in delivered_tsns, gaps might be filled.
+  while (!delivered_tsns_.empty() &&
+         *delivered_tsns_.begin() ==
+             last_assembled_tsn_watermark_.next_value()) {
+    last_assembled_tsn_watermark_.Increment();
+    delivered_tsns_.erase(delivered_tsns_.begin());
+  }
+
+  reassembled_messages_.emplace_back(std::move(message));
+}
+
+void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) {
+  RTC_DCHECK(IsConsistent());
+  UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn());
+
+  last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
+  delivered_tsns_.erase(delivered_tsns_.begin(),
+                        delivered_tsns_.upper_bound(tsn));
+
+  queued_bytes_ -=
+      streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams());
+  RTC_DCHECK(IsConsistent());
+}
+
+bool ReassemblyQueue::IsConsistent() const {
+  // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
+  // enforced in this class. This comparison will still trigger if queued_bytes_
+  // became "negative".
+  return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
+}
+
+}  // namespace dcsctp
diff --git a/net/dcsctp/rx/reassembly_queue.h b/net/dcsctp/rx/reassembly_queue.h
new file mode 100644
index 0000000..b752e53
--- /dev/null
+++ b/net/dcsctp/rx/reassembly_queue.h
@@ -0,0 +1,163 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_RX_REASSEMBLY_QUEUE_H_
+#define NET_DCSCTP_RX_REASSEMBLY_QUEUE_H_
+
+#include <stddef.h>
+
+#include <cstdint>
+#include <memory>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/internal_types.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
+#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/rx/reassembly_streams.h"
+
+namespace dcsctp {
+
+// Contains the received DATA chunks that haven't yet been reassembled, and
+// reassembles chunks when possible.
+//
+// The actual assembly is handled by an implementation of the
+// `ReassemblyStreams` interface.
+//
+// Except for reassembling fragmented messages, this class will also handle two
+// less common operations; To handle the receiver-side of partial reliability
+// (limited number of retransmissions or limited message lifetime) as well as
+// stream resetting, which is used when a sender wishes to close a data channel.
+//
+// Partial reliability is handled when a FORWARD-TSN or I-FORWARD-TSN chunk is
+// received, and it will simply delete any chunks matching the parameters in
+// that chunk. This is mainly implemented in ReassemblyStreams.
+//
+// Resetting streams is handled when a RECONFIG chunks is received, with an
+// "Outgoing SSN Reset Request" parameter. That parameter will contain a list of
+// streams to reset, and a `sender_last_assigned_tsn`. If this TSN is not yet
+// seen, the stream cannot be directly reset, and this class will respond that
+// the reset is "deferred". But if this TSN provided is known, the stream can be
+// immediately be reset.
+//
+// The ReassemblyQueue has a maximum size, as it would otherwise be an DoS
+// attack vector where a peer could consume all memory of the other peer by
+// sending a lot of ordered chunks, but carefully withholding an early one. It
+// also has a watermark limit, which the caller can query is the number of bytes
+// is above that limit. This is used by the caller to be selective in what to
+// add to the reassembly queue, so that it's not exhausted. The caller is
+// expected to call `is_full` prior to adding data to the queue and to act
+// accordingly if the queue is full.
+class ReassemblyQueue {
+ public:
+  // When the queue is filled over this fraction (of its maximum size), the
+  // socket should restrict incoming data to avoid filling up the queue.
+  static constexpr float kHighWatermarkLimit = 0.9;
+
+  ReassemblyQueue(absl::string_view log_prefix,
+                  TSN peer_initial_tsn,
+                  size_t max_size_bytes);
+
+  // Adds a data chunk to the queue, with a `tsn` and other parameters in
+  // `data`.
+  void Add(TSN tsn, Data data);
+
+  // Indicates if the reassembly queue has any reassembled messages that can be
+  // retrieved by calling `FlushMessages`.
+  bool HasMessages() const { return !reassembled_messages_.empty(); }
+
+  // Returns any reassembled messages.
+  std::vector<DcSctpMessage> FlushMessages();
+
+  // Handle a ForwardTSN chunk, when the sender has indicated that the received
+  // (this class) should forget about some chunks. This is used to implement
+  // partial reliability.
+  void Handle(const AnyForwardTsnChunk& forward_tsn);
+
+  // Given the reset stream request and the current cum_tsn_ack, might either
+  // reset the streams directly (returns kSuccessPerformed), or at a later time,
+  // by entering the "deferred reset processing" mode (returns kInProgress).
+  ReconfigurationResponseParameter::Result ResetStreams(
+      const OutgoingSSNResetRequestParameter& req,
+      TSN cum_tsn_ack);
+
+  // Given the current (updated) cum_tsn_ack, might leave "defererred reset
+  // processing" mode and reset streams. Returns true if so.
+  bool MaybeResetStreamsDeferred(TSN cum_ack_tsn);
+
+  // The number of payload bytes that have been queued. Note that the actual
+  // memory usage is higher due to additional overhead of tracking received
+  // data.
+  size_t queued_bytes() const { return queued_bytes_; }
+
+  // The remaining bytes until the queue is full.
+  size_t remaining_bytes() const { return max_size_bytes_ - queued_bytes_; }
+
+  // Indicates if the queue is full. Data should not be added to the queue when
+  // it's full.
+  bool is_full() const { return queued_bytes_ >= max_size_bytes_; }
+
+  // Indicates if the queue is above the watermark limit, which is a certain
+  // percentage of its size.
+  bool is_above_watermark() const { return queued_bytes_ >= watermark_bytes_; }
+
+  // Returns the watermark limit, in bytes.
+  size_t watermark_bytes() const { return watermark_bytes_; }
+
+ private:
+  bool IsConsistent() const;
+  void AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns,
+                             DcSctpMessage message);
+
+  struct DeferredResetStreams {
+    explicit DeferredResetStreams(OutgoingSSNResetRequestParameter req)
+        : req(std::move(req)) {}
+    OutgoingSSNResetRequestParameter req;
+    std::vector<std::pair<TSN, Data>> deferred_chunks;
+  };
+
+  const std::string log_prefix_;
+  const size_t max_size_bytes_;
+  const size_t watermark_bytes_;
+  UnwrappedTSN::Unwrapper tsn_unwrapper_;
+
+  // Whenever a message has been assembled, either increase
+  // `last_assembled_tsn_watermark_` or - if there are gaps - add the message's
+  // TSNs into delivered_tsns_ so that messages are not re-delivered on
+  // duplicate chunks.
+  UnwrappedTSN last_assembled_tsn_watermark_;
+  std::set<UnwrappedTSN> delivered_tsns_;
+  // Messages that have been reassembled, and will be returned by
+  // `FlushMessages`.
+  std::vector<DcSctpMessage> reassembled_messages_;
+
+  // If present, "deferred reset processing" mode is active.
+  absl::optional<DeferredResetStreams> deferred_reset_streams_;
+
+  // Contains the last request sequence number of the
+  // OutgoingSSNResetRequestParameter that was performed.
+  ReconfigRequestSN last_completed_reset_req_seq_nbr_ = ReconfigRequestSN(0);
+
+  // The number of "payload bytes" that are in this queue, in total.
+  size_t queued_bytes_ = 0;
+
+  // The actual implementation of ReassemblyStreams.
+  std::unique_ptr<ReassemblyStreams> streams_;
+};
+}  // namespace dcsctp
+
+#endif  // NET_DCSCTP_RX_REASSEMBLY_QUEUE_H_
diff --git a/net/dcsctp/rx/reassembly_queue_test.cc b/net/dcsctp/rx/reassembly_queue_test.cc
new file mode 100644
index 0000000..e38372c
--- /dev/null
+++ b/net/dcsctp/rx/reassembly_queue_test.cc
@@ -0,0 +1,298 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/reassembly_queue.h"
+
+#include <stddef.h>
+
+#include <algorithm>
+#include <array>
+#include <cstdint>
+#include <iterator>
+#include <vector>
+
+#include "api/array_view.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/public/types.h"
+#include "net/dcsctp/testing/data_generator.h"
+#include "rtc_base/gunit.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+namespace {
+using ::testing::ElementsAre;
+
+// The default maximum size of the Reassembly Queue.
+static constexpr size_t kBufferSize = 10000;
+
+static constexpr StreamID kStreamID(1);
+static constexpr SSN kSSN(0);
+static constexpr MID kMID(0);
+static constexpr FSN kFSN(0);
+static constexpr PPID kPPID(53);
+
+static constexpr std::array<uint8_t, 4> kShortPayload = {1, 2, 3, 4};
+static constexpr std::array<uint8_t, 4> kMessage2Payload = {5, 6, 7, 8};
+static constexpr std::array<uint8_t, 16> kLongPayload = {
+    1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
+
+MATCHER_P3(SctpMessageIs, stream_id, ppid, expected_payload, "") {
+  if (arg.stream_id() != stream_id) {
+    *result_listener << "the stream_id is " << *arg.stream_id();
+    return false;
+  }
+
+  if (arg.ppid() != ppid) {
+    *result_listener << "the ppid is " << *arg.ppid();
+    return false;
+  }
+
+  if (std::vector<uint8_t>(arg.payload().begin(), arg.payload().end()) !=
+      std::vector<uint8_t>(expected_payload.begin(), expected_payload.end())) {
+    *result_listener << "the payload is wrong";
+    return false;
+  }
+  return true;
+}
+
+class ReassemblyQueueTest : public testing::Test {
+ protected:
+  ReassemblyQueueTest() {}
+  DataGenerator gen_;
+};
+
+TEST_F(ReassemblyQueueTest, EmptyQueue) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  EXPECT_FALSE(reasm.HasMessages());
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+}
+
+TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessage) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE"));
+  EXPECT_TRUE(reasm.HasMessages());
+  EXPECT_THAT(reasm.FlushMessages(),
+              ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+}
+
+TEST_F(ReassemblyQueueTest, LargeUnorderedChunkAllPermutations) {
+  std::vector<uint32_t> tsns = {10, 11, 12, 13};
+  rtc::ArrayView<const uint8_t> payload(kLongPayload);
+  do {
+    ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+
+    for (size_t i = 0; i < tsns.size(); i++) {
+      auto span = payload.subview((tsns[i] - 10) * 4, 4);
+      Data::IsBeginning is_beginning(tsns[i] == 10);
+      Data::IsEnd is_end(tsns[i] == 13);
+
+      reasm.Add(TSN(tsns[i]),
+                Data(kStreamID, kSSN, kMID, kFSN, kPPID,
+                     std::vector<uint8_t>(span.begin(), span.end()),
+                     is_beginning, is_end, IsUnordered(false)));
+      if (i < 3) {
+        EXPECT_FALSE(reasm.HasMessages());
+      } else {
+        EXPECT_TRUE(reasm.HasMessages());
+        EXPECT_THAT(reasm.FlushMessages(),
+                    ElementsAre(SctpMessageIs(kStreamID, kPPID, kLongPayload)));
+        EXPECT_EQ(reasm.queued_bytes(), 0u);
+      }
+    }
+  } while (std::next_permutation(std::begin(tsns), std::end(tsns)));
+}
+
+TEST_F(ReassemblyQueueTest, SingleOrderedChunkMessage) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE"));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+  EXPECT_TRUE(reasm.HasMessages());
+  EXPECT_THAT(reasm.FlushMessages(),
+              ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
+}
+
+TEST_F(ReassemblyQueueTest, ManySmallOrderedMessages) {
+  std::vector<uint32_t> tsns = {10, 11, 12, 13};
+  rtc::ArrayView<const uint8_t> payload(kLongPayload);
+  do {
+    ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+    for (size_t i = 0; i < tsns.size(); i++) {
+      auto span = payload.subview((tsns[i] - 10) * 4, 4);
+      Data::IsBeginning is_beginning(true);
+      Data::IsEnd is_end(true);
+
+      SSN ssn(static_cast<uint16_t>(tsns[i] - 10));
+      reasm.Add(TSN(tsns[i]),
+                Data(kStreamID, ssn, kMID, kFSN, kPPID,
+                     std::vector<uint8_t>(span.begin(), span.end()),
+                     is_beginning, is_end, IsUnordered(false)));
+    }
+    EXPECT_THAT(
+        reasm.FlushMessages(),
+        ElementsAre(SctpMessageIs(kStreamID, kPPID, payload.subview(0, 4)),
+                    SctpMessageIs(kStreamID, kPPID, payload.subview(4, 4)),
+                    SctpMessageIs(kStreamID, kPPID, payload.subview(8, 4)),
+                    SctpMessageIs(kStreamID, kPPID, payload.subview(12, 4))));
+    EXPECT_EQ(reasm.queued_bytes(), 0u);
+  } while (std::next_permutation(std::begin(tsns), std::end(tsns)));
+}
+
+TEST_F(ReassemblyQueueTest, RetransmissionInLargeOrdered) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
+  reasm.Add(TSN(12), gen_.Ordered({3}));
+  reasm.Add(TSN(13), gen_.Ordered({4}));
+  reasm.Add(TSN(14), gen_.Ordered({5}));
+  reasm.Add(TSN(15), gen_.Ordered({6}));
+  reasm.Add(TSN(16), gen_.Ordered({7}));
+  reasm.Add(TSN(17), gen_.Ordered({8}));
+  EXPECT_EQ(reasm.queued_bytes(), 7u);
+
+  // lost and retransmitted
+  reasm.Add(TSN(11), gen_.Ordered({2}));
+  reasm.Add(TSN(18), gen_.Ordered({9}));
+  reasm.Add(TSN(19), gen_.Ordered({10}));
+  EXPECT_EQ(reasm.queued_bytes(), 10u);
+  EXPECT_FALSE(reasm.HasMessages());
+
+  reasm.Add(TSN(20), gen_.Ordered({11, 12, 13, 14, 15, 16}, "E"));
+  EXPECT_TRUE(reasm.HasMessages());
+  EXPECT_THAT(reasm.FlushMessages(),
+              ElementsAre(SctpMessageIs(kStreamID, kPPID, kLongPayload)));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+}
+
+TEST_F(ReassemblyQueueTest, ForwardTSNRemoveUnordered) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  reasm.Add(TSN(10), gen_.Unordered({1}, "B"));
+  reasm.Add(TSN(12), gen_.Unordered({3}));
+  reasm.Add(TSN(13), gen_.Unordered({4}, "E"));
+
+  reasm.Add(TSN(14), gen_.Unordered({5}, "B"));
+  reasm.Add(TSN(15), gen_.Unordered({6}));
+  reasm.Add(TSN(17), gen_.Unordered({8}, "E"));
+  EXPECT_EQ(reasm.queued_bytes(), 6u);
+
+  EXPECT_FALSE(reasm.HasMessages());
+
+  reasm.Handle(ForwardTsnChunk(TSN(13), {}));
+  EXPECT_EQ(reasm.queued_bytes(), 3u);
+
+  // The lost chunk comes, but too late.
+  reasm.Add(TSN(11), gen_.Unordered({2}));
+  EXPECT_FALSE(reasm.HasMessages());
+  EXPECT_EQ(reasm.queued_bytes(), 3u);
+
+  // The second lost chunk comes, message is assembled.
+  reasm.Add(TSN(16), gen_.Unordered({7}));
+  EXPECT_TRUE(reasm.HasMessages());
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+}
+
+TEST_F(ReassemblyQueueTest, ForwardTSNRemoveOrdered) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
+  reasm.Add(TSN(12), gen_.Ordered({3}));
+  reasm.Add(TSN(13), gen_.Ordered({4}, "E"));
+
+  reasm.Add(TSN(14), gen_.Ordered({5}, "B"));
+  reasm.Add(TSN(15), gen_.Ordered({6}));
+  reasm.Add(TSN(16), gen_.Ordered({7}));
+  reasm.Add(TSN(17), gen_.Ordered({8}, "E"));
+  EXPECT_EQ(reasm.queued_bytes(), 7u);
+
+  EXPECT_FALSE(reasm.HasMessages());
+
+  reasm.Handle(ForwardTsnChunk(
+      TSN(13), {ForwardTsnChunk::SkippedStream(kStreamID, kSSN)}));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+
+  // The lost chunk comes, but too late.
+  EXPECT_TRUE(reasm.HasMessages());
+  EXPECT_THAT(reasm.FlushMessages(),
+              ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload)));
+}
+
+TEST_F(ReassemblyQueueTest, ForwardTSNRemoveALotOrdered) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
+  reasm.Add(TSN(12), gen_.Ordered({3}));
+  reasm.Add(TSN(13), gen_.Ordered({4}, "E"));
+
+  reasm.Add(TSN(15), gen_.Ordered({5}, "B"));
+  reasm.Add(TSN(16), gen_.Ordered({6}));
+  reasm.Add(TSN(17), gen_.Ordered({7}));
+  reasm.Add(TSN(18), gen_.Ordered({8}, "E"));
+  EXPECT_EQ(reasm.queued_bytes(), 7u);
+
+  EXPECT_FALSE(reasm.HasMessages());
+
+  reasm.Handle(ForwardTsnChunk(
+      TSN(13), {ForwardTsnChunk::SkippedStream(kStreamID, kSSN)}));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+
+  // The lost chunk comes, but too late.
+  EXPECT_TRUE(reasm.HasMessages());
+  EXPECT_THAT(reasm.FlushMessages(),
+              ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload)));
+}
+
+TEST_F(ReassemblyQueueTest, ShouldntDeliverMessagesBeforeInitialTsn) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  reasm.Add(TSN(5), gen_.Unordered({1, 2, 3, 4}, "BE"));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+  EXPECT_FALSE(reasm.HasMessages());
+}
+
+TEST_F(ReassemblyQueueTest, ShouldntRedeliverUnorderedMessages) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE"));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+  EXPECT_TRUE(reasm.HasMessages());
+  EXPECT_THAT(reasm.FlushMessages(),
+              ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
+  reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "BE"));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+  EXPECT_FALSE(reasm.HasMessages());
+}
+
+TEST_F(ReassemblyQueueTest, ShouldntRedeliverUnorderedMessagesReallyUnordered) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  reasm.Add(TSN(10), gen_.Unordered({1, 2, 3, 4}, "B"));
+  EXPECT_EQ(reasm.queued_bytes(), 4u);
+
+  EXPECT_FALSE(reasm.HasMessages());
+
+  reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
+  EXPECT_EQ(reasm.queued_bytes(), 4u);
+  EXPECT_TRUE(reasm.HasMessages());
+
+  EXPECT_THAT(reasm.FlushMessages(),
+              ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
+  reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
+  EXPECT_EQ(reasm.queued_bytes(), 4u);
+  EXPECT_FALSE(reasm.HasMessages());
+}
+
+TEST_F(ReassemblyQueueTest, ShouldntDeliverBeforeForwardedTsn) {
+  ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
+  reasm.Handle(ForwardTsnChunk(TSN(12), {}));
+
+  reasm.Add(TSN(12), gen_.Unordered({1, 2, 3, 4}, "BE"));
+  EXPECT_EQ(reasm.queued_bytes(), 0u);
+  EXPECT_FALSE(reasm.HasMessages());
+}
+
+}  // namespace
+}  // namespace dcsctp