dcsctp: Add Traditional Reassembly Streams
This class handles the assembly of fragmented received messages (as DATA
chunks) and manage per-stream queues. This class only handles
non-interleaved messages as described in RFC4960, and is not used when
message interleaving is enabled on the association, as described in
RFC8260.
This is also only part of the reassembly - a follow-up change will add
the ReassemblyQueue that handle the other part as well. And an even
further follow-up change will add a "interleaved reassembly stream".
Bug: webrtc:12614
Change-Id: Iaf339fa215a2b14926f5cb74f15528392e273f99
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214042
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33677}
diff --git a/net/dcsctp/rx/BUILD.gn b/net/dcsctp/rx/BUILD.gn
index 224c546..6c26e11 100644
--- a/net/dcsctp/rx/BUILD.gn
+++ b/net/dcsctp/rx/BUILD.gn
@@ -21,18 +21,42 @@
]
}
+rtc_source_set("reassembly_streams") {
+ deps = [ "../packet:chunk" ]
+ sources = [ "reassembly_streams.h" ]
+}
+
+rtc_library("traditional_reassembly_streams") {
+ deps = [
+ ":reassembly_streams",
+ "../../../api:array_view",
+ "../../../rtc_base",
+ "../../../rtc_base:checks",
+ "../../../rtc_base:rtc_base_approved",
+ ]
+ sources = [
+ "traditional_reassembly_streams.cc",
+ "traditional_reassembly_streams.h",
+ ]
+}
+
if (rtc_include_tests) {
rtc_library("dcsctp_rx_unittests") {
testonly = true
deps = [
":data_tracker",
+ ":traditional_reassembly_streams",
"../../../api:array_view",
"../../../rtc_base:checks",
"../../../rtc_base:gunit_helpers",
"../../../rtc_base:rtc_base_approved",
"../../../test:test_support",
+ "../testing:data_generator",
]
- sources = [ "data_tracker_test.cc" ]
+ sources = [
+ "data_tracker_test.cc",
+ "traditional_reassembly_streams_test.cc",
+ ]
}
}
diff --git a/net/dcsctp/rx/reassembly_streams.h b/net/dcsctp/rx/reassembly_streams.h
new file mode 100644
index 0000000..a8b42b5
--- /dev/null
+++ b/net/dcsctp/rx/reassembly_streams.h
@@ -0,0 +1,84 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_RX_REASSEMBLY_STREAMS_H_
+#define NET_DCSCTP_RX_REASSEMBLY_STREAMS_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <functional>
+#include <vector>
+
+#include "absl/strings/string_view.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+
+namespace dcsctp {
+
+// Implementations of this interface will be called when data is received, when
+// data should be skipped/forgotten or when sequence number should be reset.
+//
+// As a result of these operations - mainly when data is received - the
+// implementations of this interface should notify when a message has been
+// assembled, by calling the provided callback of type `OnAssembledMessage`. How
+// it assembles messages will depend on e.g. if a message was sent on an ordered
+// or unordered stream.
+//
+// Implementations will - for each operation - indicate how much additional
+// memory that has been used as a result of performing the operation. This is
+// used to limit the maximum amount of memory used, to prevent out-of-memory
+// situations.
+class ReassemblyStreams {
+ public:
+ // This callback will be provided as an argument to the constructor of the
+ // concrete class implementing this interface and should be called when a
+ // message has been assembled as well as indicating from which TSNs this
+ // message was assembled from.
+ using OnAssembledMessage =
+ std::function<void(rtc::ArrayView<const UnwrappedTSN> tsns,
+ DcSctpMessage message)>;
+
+ virtual ~ReassemblyStreams() = default;
+
+ // Adds a data chunk to a stream as identified in `data`.
+ // If it was the last remaining chunk in a message, reassemble one (or
+ // several, in case of ordered chunks) messages.
+ //
+ // Returns the additional number of bytes added to the queue as a result of
+ // performing this operation. If this addition resulted in messages being
+ // assembled and delivered, this may be negative.
+ virtual int Add(UnwrappedTSN tsn, Data data) = 0;
+
+ // Called for incoming FORWARD-TSN/I-FORWARD-TSN chunks - when the sender
+ // wishes the received to skip/forget about data up until the provided TSN.
+ // This is used to implement partial reliability, such as limiting the number
+ // of retransmissions or the an expiration duration. As a result of skipping
+ // data, this may result in the implementation being able to assemble messages
+ // in ordered streams.
+ //
+ // Returns the number of bytes removed from the queue as a result of
+ // this operation.
+ virtual size_t HandleForwardTsn(
+ UnwrappedTSN new_cumulative_ack_tsn,
+ rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream>
+ skipped_streams) = 0;
+
+ // Called for incoming (possibly deferred) RE_CONFIG chunks asking for
+ // either a few streams, or all streams (when the list is empty) to be
+ // reset - to have their next SSN or Message ID to be zero.
+ virtual void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) = 0;
+};
+
+} // namespace dcsctp
+
+#endif // NET_DCSCTP_RX_REASSEMBLY_STREAMS_H_
diff --git a/net/dcsctp/rx/traditional_reassembly_streams.cc b/net/dcsctp/rx/traditional_reassembly_streams.cc
new file mode 100644
index 0000000..caa97d2
--- /dev/null
+++ b/net/dcsctp/rx/traditional_reassembly_streams.cc
@@ -0,0 +1,289 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/traditional_reassembly_streams.h"
+
+#include <stddef.h>
+
+#include <cstdint>
+#include <functional>
+#include <iterator>
+#include <map>
+#include <numeric>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "absl/algorithm/container.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_message.h"
+#include "rtc_base/logging.h"
+
+namespace dcsctp {
+namespace {
+
+// Given a map (`chunks`) and an iterator to within that map (`iter`), this
+// function will return an iterator to the first chunk in that message, which
+// has the `is_beginning` flag set. If there are any gaps, or if the beginning
+// can't be found, `absl::nullopt` is returned.
+absl::optional<std::map<UnwrappedTSN, Data>::iterator> FindBeginning(
+ const std::map<UnwrappedTSN, Data>& chunks,
+ std::map<UnwrappedTSN, Data>::iterator iter) {
+ UnwrappedTSN prev_tsn = iter->first;
+ for (;;) {
+ if (iter->second.is_beginning) {
+ return iter;
+ }
+ if (iter == chunks.begin()) {
+ return absl::nullopt;
+ }
+ --iter;
+ if (iter->first.next_value() != prev_tsn) {
+ return absl::nullopt;
+ }
+ prev_tsn = iter->first;
+ }
+}
+
+// Given a map (`chunks`) and an iterator to within that map (`iter`), this
+// function will return an iterator to the chunk after the last chunk in that
+// message, which has the `is_end` flag set. If there are any gaps, or if the
+// end can't be found, `absl::nullopt` is returned.
+absl::optional<std::map<UnwrappedTSN, Data>::iterator> FindEnd(
+ std::map<UnwrappedTSN, Data>& chunks,
+ std::map<UnwrappedTSN, Data>::iterator iter) {
+ UnwrappedTSN prev_tsn = iter->first;
+ for (;;) {
+ if (iter->second.is_end) {
+ return ++iter;
+ }
+ ++iter;
+ if (iter == chunks.end()) {
+ return absl::nullopt;
+ }
+ if (iter->first != prev_tsn.next_value()) {
+ return absl::nullopt;
+ }
+ prev_tsn = iter->first;
+ }
+}
+} // namespace
+
+int TraditionalReassemblyStreams::UnorderedStream::Add(UnwrappedTSN tsn,
+ Data data) {
+ int queued_bytes = data.size();
+ auto p = chunks_.emplace(tsn, std::move(data));
+ if (!p.second /* !inserted */) {
+ return 0;
+ }
+
+ queued_bytes -= TryToAssembleMessage(p.first);
+
+ return queued_bytes;
+}
+
+size_t TraditionalReassemblyStreams::UnorderedStream::TryToAssembleMessage(
+ ChunkMap::iterator iter) {
+ // TODO(boivie): This method is O(N) with the number of fragments in a
+ // message, which can be inefficient for very large values of N. This could be
+ // optimized by e.g. only trying to assemble a message once _any_ beginning
+ // and _any_ end has been found.
+ absl::optional<ChunkMap::iterator> start = FindBeginning(chunks_, iter);
+ if (!start.has_value()) {
+ return 0;
+ }
+ absl::optional<ChunkMap::iterator> end = FindEnd(chunks_, iter);
+ if (!end.has_value()) {
+ return 0;
+ }
+
+ size_t bytes_assembled = AssembleMessage(*start, *end);
+ chunks_.erase(*start, *end);
+ return bytes_assembled;
+}
+
+size_t TraditionalReassemblyStreams::StreamBase::AssembleMessage(
+ const ChunkMap::iterator start,
+ const ChunkMap::iterator end) {
+ size_t count = std::distance(start, end);
+
+ if (count == 1) {
+ // Fast path - zero-copy
+ const Data& data = start->second;
+ size_t payload_size = start->second.size();
+ UnwrappedTSN tsns[1] = {start->first};
+ DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
+ parent_.on_assembled_message_(tsns, std::move(message));
+ return payload_size;
+ }
+
+ // Slow path - will need to concatenate the payload.
+ std::vector<UnwrappedTSN> tsns;
+ std::vector<uint8_t> payload;
+
+ size_t payload_size = std::accumulate(
+ start, end, 0,
+ [](size_t v, const auto& p) { return v + p.second.size(); });
+
+ tsns.reserve(count);
+ payload.reserve(payload_size);
+ for (auto it = start; it != end; ++it) {
+ const Data& data = it->second;
+ tsns.push_back(it->first);
+ payload.insert(payload.end(), data.payload.begin(), data.payload.end());
+ }
+
+ DcSctpMessage message(start->second.stream_id, start->second.ppid,
+ std::move(payload));
+ parent_.on_assembled_message_(tsns, std::move(message));
+
+ return payload_size;
+}
+
+size_t TraditionalReassemblyStreams::UnorderedStream::EraseTo(
+ UnwrappedTSN tsn) {
+ auto end_iter = chunks_.upper_bound(tsn);
+ size_t removed_bytes = std::accumulate(
+ chunks_.begin(), end_iter, 0,
+ [](size_t r, const auto& p) { return r + p.second.size(); });
+
+ chunks_.erase(chunks_.begin(), end_iter);
+ return removed_bytes;
+}
+
+size_t TraditionalReassemblyStreams::OrderedStream::TryToAssembleMessage() {
+ if (chunks_by_ssn_.empty() || chunks_by_ssn_.begin()->first != next_ssn_) {
+ return 0;
+ }
+
+ ChunkMap& chunks = chunks_by_ssn_.begin()->second;
+
+ if (!chunks.begin()->second.is_beginning || !chunks.rbegin()->second.is_end) {
+ return 0;
+ }
+
+ uint32_t tsn_diff = chunks.rbegin()->first.Difference(chunks.begin()->first);
+ if (tsn_diff != chunks.size() - 1) {
+ return 0;
+ }
+
+ size_t assembled_bytes = AssembleMessage(chunks.begin(), chunks.end());
+ chunks_by_ssn_.erase(chunks_by_ssn_.begin());
+ next_ssn_.Increment();
+ return assembled_bytes;
+}
+
+size_t TraditionalReassemblyStreams::OrderedStream::TryToAssembleMessages() {
+ size_t assembled_bytes = 0;
+
+ for (;;) {
+ size_t assembled_bytes_this_iter = TryToAssembleMessage();
+ if (assembled_bytes_this_iter == 0) {
+ break;
+ }
+ assembled_bytes += assembled_bytes_this_iter;
+ }
+ return assembled_bytes;
+}
+
+int TraditionalReassemblyStreams::OrderedStream::Add(UnwrappedTSN tsn,
+ Data data) {
+ int queued_bytes = data.size();
+
+ UnwrappedSSN ssn = ssn_unwrapper_.Unwrap(data.ssn);
+ auto p = chunks_by_ssn_[ssn].emplace(tsn, std::move(data));
+ if (!p.second /* !inserted */) {
+ return 0;
+ }
+
+ if (ssn == next_ssn_) {
+ queued_bytes -= TryToAssembleMessages();
+ }
+
+ return queued_bytes;
+}
+
+size_t TraditionalReassemblyStreams::OrderedStream::EraseTo(SSN ssn) {
+ UnwrappedSSN unwrapped_ssn = ssn_unwrapper_.Unwrap(ssn);
+
+ auto end_iter = chunks_by_ssn_.upper_bound(unwrapped_ssn);
+ size_t removed_bytes = std::accumulate(
+ chunks_by_ssn_.begin(), end_iter, 0, [](size_t r1, const auto& p) {
+ return r1 +
+ absl::c_accumulate(p.second, 0, [](size_t r2, const auto& q) {
+ return r2 + q.second.size();
+ });
+ });
+ chunks_by_ssn_.erase(chunks_by_ssn_.begin(), end_iter);
+
+ if (unwrapped_ssn >= next_ssn_) {
+ unwrapped_ssn.Increment();
+ next_ssn_ = unwrapped_ssn;
+ }
+
+ removed_bytes += TryToAssembleMessages();
+ return removed_bytes;
+}
+
+int TraditionalReassemblyStreams::Add(UnwrappedTSN tsn, Data data) {
+ if (data.is_unordered) {
+ auto it = unordered_streams_.emplace(data.stream_id, this).first;
+ return it->second.Add(tsn, std::move(data));
+ }
+
+ auto it = ordered_streams_.emplace(data.stream_id, this).first;
+ return it->second.Add(tsn, std::move(data));
+}
+
+size_t TraditionalReassemblyStreams::HandleForwardTsn(
+ UnwrappedTSN new_cumulative_ack_tsn,
+ rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
+ size_t bytes_removed = 0;
+ // The `skipped_streams` only over ordered messages - need to
+ // iterate all unordered streams manually to remove those chunks.
+ for (auto& entry : unordered_streams_) {
+ bytes_removed += entry.second.EraseTo(new_cumulative_ack_tsn);
+ }
+
+ for (const auto& skipped_stream : skipped_streams) {
+ auto it = ordered_streams_.find(skipped_stream.stream_id);
+ if (it != ordered_streams_.end()) {
+ bytes_removed += it->second.EraseTo(skipped_stream.ssn);
+ }
+ }
+
+ return bytes_removed;
+}
+
+void TraditionalReassemblyStreams::ResetStreams(
+ rtc::ArrayView<const StreamID> stream_ids) {
+ if (stream_ids.empty()) {
+ for (auto& entry : ordered_streams_) {
+ const StreamID& stream_id = entry.first;
+ OrderedStream& stream = entry.second;
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "Resetting implicit stream_id=" << *stream_id;
+ stream.Reset();
+ }
+ } else {
+ for (StreamID stream_id : stream_ids) {
+ auto it = ordered_streams_.find(stream_id);
+ if (it != ordered_streams_.end()) {
+ RTC_DLOG(LS_VERBOSE)
+ << log_prefix_ << "Resetting explicit stream_id=" << *stream_id;
+ it->second.Reset();
+ }
+ }
+ }
+}
+} // namespace dcsctp
diff --git a/net/dcsctp/rx/traditional_reassembly_streams.h b/net/dcsctp/rx/traditional_reassembly_streams.h
new file mode 100644
index 0000000..12d1d933
--- /dev/null
+++ b/net/dcsctp/rx/traditional_reassembly_streams.h
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_RX_TRADITIONAL_REASSEMBLY_STREAMS_H_
+#define NET_DCSCTP_RX_TRADITIONAL_REASSEMBLY_STREAMS_H_
+#include <stddef.h>
+#include <stdint.h>
+
+#include <map>
+#include <string>
+#include <unordered_map>
+
+#include "absl/strings/string_view.h"
+#include "api/array_view.h"
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/rx/reassembly_streams.h"
+
+namespace dcsctp {
+
+// Handles reassembly of incoming data when interleaved message sending
+// is not enabled on the association, i.e. when RFC8260 is not in use and
+// RFC4960 is to be followed.
+class TraditionalReassemblyStreams : public ReassemblyStreams {
+ public:
+ TraditionalReassemblyStreams(absl::string_view log_prefix,
+ OnAssembledMessage on_assembled_message)
+ : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {}
+
+ int Add(UnwrappedTSN tsn, Data data) override;
+
+ size_t HandleForwardTsn(
+ UnwrappedTSN new_cumulative_ack_tsn,
+ rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams)
+ override;
+
+ void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) override;
+
+ private:
+ using ChunkMap = std::map<UnwrappedTSN, Data>;
+
+ // Base class for `UnorderedStream` and `OrderedStream`.
+ class StreamBase {
+ protected:
+ explicit StreamBase(TraditionalReassemblyStreams* parent)
+ : parent_(*parent) {}
+
+ size_t AssembleMessage(const ChunkMap::iterator start,
+ const ChunkMap::iterator end);
+ TraditionalReassemblyStreams& parent_;
+ };
+
+ // Manages all received data for a specific unordered stream, and assembles
+ // messages when possible.
+ class UnorderedStream : StreamBase {
+ public:
+ explicit UnorderedStream(TraditionalReassemblyStreams* parent)
+ : StreamBase(parent) {}
+ int Add(UnwrappedTSN tsn, Data data);
+ // Returns the number of bytes removed from the queue.
+ size_t EraseTo(UnwrappedTSN tsn);
+
+ private:
+ // Given an iterator to any chunk within the map, try to assemble a message
+ // into `reassembled_messages` containing it and - if successful - erase
+ // those chunks from the stream chunks map.
+ //
+ // Returns the number of bytes that were assembled.
+ size_t TryToAssembleMessage(ChunkMap::iterator iter);
+
+ ChunkMap chunks_;
+ };
+
+ // Manages all received data for a specific ordered stream, and assembles
+ // messages when possible.
+ class OrderedStream : StreamBase {
+ public:
+ explicit OrderedStream(TraditionalReassemblyStreams* parent)
+ : StreamBase(parent), next_ssn_(ssn_unwrapper_.Unwrap(SSN(0))) {}
+ int Add(UnwrappedTSN tsn, Data data);
+ size_t EraseTo(SSN ssn);
+ void Reset() {
+ ssn_unwrapper_.Reset();
+ next_ssn_ = ssn_unwrapper_.Unwrap(SSN(0));
+ }
+
+ private:
+ // Try to assemble one or several messages in order from the stream.
+ // Returns the number of bytes assembled if a message was assembled.
+ size_t TryToAssembleMessage();
+ size_t TryToAssembleMessages();
+ // This must be an ordered container to be able to iterate in SSN order.
+ std::map<UnwrappedSSN, ChunkMap> chunks_by_ssn_;
+ UnwrappedSSN::Unwrapper ssn_unwrapper_;
+ UnwrappedSSN next_ssn_;
+ };
+
+ const std::string log_prefix_;
+
+ // Callback for when a message has been assembled.
+ const OnAssembledMessage on_assembled_message_;
+
+ // All unordered and ordered streams, managing not-yet-assembled data.
+ std::unordered_map<StreamID, UnorderedStream, StreamID::Hasher>
+ unordered_streams_;
+ std::unordered_map<StreamID, OrderedStream, StreamID::Hasher>
+ ordered_streams_;
+};
+
+} // namespace dcsctp
+
+#endif // NET_DCSCTP_RX_TRADITIONAL_REASSEMBLY_STREAMS_H_
diff --git a/net/dcsctp/rx/traditional_reassembly_streams_test.cc b/net/dcsctp/rx/traditional_reassembly_streams_test.cc
new file mode 100644
index 0000000..30d29a0
--- /dev/null
+++ b/net/dcsctp/rx/traditional_reassembly_streams_test.cc
@@ -0,0 +1,152 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#include "net/dcsctp/rx/traditional_reassembly_streams.h"
+
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+#include "net/dcsctp/common/sequence_numbers.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
+#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/rx/reassembly_streams.h"
+#include "net/dcsctp/testing/data_generator.h"
+#include "rtc_base/gunit.h"
+#include "test/gmock.h"
+
+namespace dcsctp {
+namespace {
+using ::testing::MockFunction;
+using ::testing::NiceMock;
+
+class TraditionalReassemblyStreamsTest : public testing::Test {
+ protected:
+ UnwrappedTSN tsn(uint32_t value) { return tsn_.Unwrap(TSN(value)); }
+
+ TraditionalReassemblyStreamsTest() {}
+ DataGenerator gen_;
+ UnwrappedTSN::Unwrapper tsn_;
+};
+
+TEST_F(TraditionalReassemblyStreamsTest,
+ AddUnorderedMessageReturnsCorrectSize) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
+ EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
+ EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2);
+ // Adding the end fragment should make it empty again.
+ EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({7}, "E")), -6);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+ AddSimpleOrderedMessageReturnsCorrectSize) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+ EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
+ EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+ EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), -6);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+ AddMoreComplexOrderedMessageReturnsCorrectSize) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+ Data late = gen_.Ordered({2, 3, 4});
+ EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+ EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
+
+ EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
+ EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
+ EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
+ EXPECT_EQ(streams.Add(tsn(2), std::move(late)), -8);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+ DeleteUnorderedMessageReturnsCorrectSize) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
+ EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
+ EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2);
+
+ EXPECT_EQ(streams.HandleForwardTsn(tsn(3), {}), 6u);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+ DeleteSimpleOrderedMessageReturnsCorrectSize) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+ EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
+ EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+
+ ForwardTsnChunk::SkippedStream skipped[] = {
+ ForwardTsnChunk::SkippedStream(StreamID(1), SSN(0))};
+ EXPECT_EQ(streams.HandleForwardTsn(tsn(3), skipped), 6u);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+ DeleteManyOrderedMessagesReturnsCorrectSize) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+ gen_.Ordered({2, 3, 4});
+ EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+ EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
+
+ EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
+ EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
+ EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
+
+ // Expire all three messages
+ ForwardTsnChunk::SkippedStream skipped[] = {
+ ForwardTsnChunk::SkippedStream(StreamID(1), SSN(2))};
+ EXPECT_EQ(streams.HandleForwardTsn(tsn(8), skipped), 8u);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+ DeleteOrderedMessageDelivesTwoReturnsCorrectSize) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+ gen_.Ordered({2, 3, 4});
+ EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+ EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
+
+ EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
+ EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
+ EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
+
+ // The first ordered message expire, and the following two are delivered.
+ ForwardTsnChunk::SkippedStream skipped[] = {
+ ForwardTsnChunk::SkippedStream(StreamID(1), SSN(0))};
+ EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u);
+}
+
+} // namespace
+} // namespace dcsctp