dcsctp: introduce handover API types and implement it for streams
Bug: webrtc:13154
Change-Id: Ifa250175af79b7adc87dbc2750054adc94b90bb7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/231842
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Commit-Queue: Sergey Sukhanov <sergeysu@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#34991}
diff --git a/net/dcsctp/public/BUILD.gn b/net/dcsctp/public/BUILD.gn
index 23530a6..8fb521c 100644
--- a/net/dcsctp/public/BUILD.gn
+++ b/net/dcsctp/public/BUILD.gn
@@ -27,6 +27,7 @@
rtc_source_set("socket") {
deps = [
+ ":strong_alias",
":types",
"../../../api:array_view",
"../../../rtc_base",
@@ -34,6 +35,7 @@
"../../../rtc_base:rtc_base_approved",
]
sources = [
+ "dcsctp_handover_state.h",
"dcsctp_socket.h",
"packet_observer.h",
"timeout.h",
diff --git a/net/dcsctp/public/dcsctp_handover_state.h b/net/dcsctp/public/dcsctp_handover_state.h
new file mode 100644
index 0000000..d1267ca
--- /dev/null
+++ b/net/dcsctp/public/dcsctp_handover_state.h
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef NET_DCSCTP_PUBLIC_DCSCTP_HANDOVER_STATE_H_
+#define NET_DCSCTP_PUBLIC_DCSCTP_HANDOVER_STATE_H_
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include "net/dcsctp/public/strong_alias.h"
+
+namespace dcsctp {
+
+// Stores state snapshot of a dcSCTP socket. The snapshot can be used to
+// recreate the socket - possibly in another process. This state should be
+// treaded as opaque - the calling client should not inspect or alter it except
+// for serialization. Serialization is not provided by dcSCTP. If needed it has
+// to be implemented in the calling client.
+struct DcSctpSocketHandoverState {
+ struct OrderedStream {
+ uint32_t id = 0;
+ uint32_t next_ssn = 0;
+ };
+ struct UnorderedStream {
+ uint32_t id = 0;
+ };
+ struct Receive {
+ std::vector<OrderedStream> ordered_streams;
+ std::vector<UnorderedStream> unordered_streams;
+ };
+ Receive rx;
+};
+
+// A list of possible reasons for a socket to be not ready for handover.
+enum class HandoverUnreadinessReason : uint32_t {
+ kWrongConnectionState = 1,
+ kSendQueueNotEmpty = 2,
+ kDataTrackerNotIdle = 4,
+ kDataTrackerTsnBlocksPending = 8,
+ kReassemblyQueueNotEmpty = 16,
+ kReassemblyQueueDeliveredTSNsGap = 32,
+ kStreamResetDeferred = 64,
+ kOrderedStreamHasUnassembledChunks = 128,
+ kUnorderedStreamHasUnassembledChunks = 256,
+ kRetransmissionQueueOutstandingData = 512,
+ kRetransmissionQueueFastRecovery = 1024,
+ kRetransmissionQueueNotEmpty = 2048,
+ kPendingStreamReset = 4096,
+ kPendingStreamResetRequest = 8192,
+ kMax = kPendingStreamResetRequest,
+};
+
+// Return value of `DcSctpSocketInterface::GetHandoverReadiness`. Set of
+// `HandoverUnreadinessReason` bits. When no bit is set, the socket is in the
+// state in which a snapshot of the state can be made by
+// `GetHandoverStateAndClose()`.
+class HandoverReadinessStatus
+ : public StrongAlias<class HandoverReadinessStatusTag, uint32_t> {
+ public:
+ // Constructs an empty `HandoverReadinessStatus` which represents ready state.
+ constexpr HandoverReadinessStatus()
+ : StrongAlias<class HandoverReadinessStatusTag, uint32_t>(0) {}
+ // Constructs status object that contains a single reason for not being
+ // handover ready.
+ constexpr explicit HandoverReadinessStatus(HandoverUnreadinessReason reason)
+ : StrongAlias<class HandoverReadinessStatusTag, uint32_t>(
+ static_cast<uint32_t>(reason)) {}
+
+ // Convenience methods
+ constexpr bool IsReady() const { return value() == 0; }
+ constexpr bool Contains(HandoverUnreadinessReason reason) const {
+ return value() & static_cast<uint32_t>(reason);
+ }
+ HandoverReadinessStatus& Add(HandoverUnreadinessReason reason) {
+ return Add(HandoverReadinessStatus(reason));
+ }
+ HandoverReadinessStatus& Add(HandoverReadinessStatus status) {
+ value() |= status.value();
+ return *this;
+ }
+};
+
+} // namespace dcsctp
+
+#endif // NET_DCSCTP_PUBLIC_DCSCTP_HANDOVER_STATE_H_
diff --git a/net/dcsctp/rx/BUILD.gn b/net/dcsctp/rx/BUILD.gn
index fb92513..6c49648 100644
--- a/net/dcsctp/rx/BUILD.gn
+++ b/net/dcsctp/rx/BUILD.gn
@@ -36,6 +36,7 @@
"../common:sequence_numbers",
"../packet:chunk",
"../packet:data",
+ "../public:socket",
"../public:types",
]
sources = [ "reassembly_streams.h" ]
diff --git a/net/dcsctp/rx/reassembly_streams.h b/net/dcsctp/rx/reassembly_streams.h
index a8b42b5..06f1a78 100644
--- a/net/dcsctp/rx/reassembly_streams.h
+++ b/net/dcsctp/rx/reassembly_streams.h
@@ -21,6 +21,7 @@
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
+#include "net/dcsctp/public/dcsctp_handover_state.h"
#include "net/dcsctp/public/dcsctp_message.h"
namespace dcsctp {
@@ -77,6 +78,9 @@
// either a few streams, or all streams (when the list is empty) to be
// reset - to have their next SSN or Message ID to be zero.
virtual void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) = 0;
+
+ virtual HandoverReadinessStatus GetHandoverReadiness() const = 0;
+ virtual void AddHandoverState(DcSctpSocketHandoverState& state) = 0;
};
} // namespace dcsctp
diff --git a/net/dcsctp/rx/traditional_reassembly_streams.cc b/net/dcsctp/rx/traditional_reassembly_streams.cc
index 4108d37..c9af293 100644
--- a/net/dcsctp/rx/traditional_reassembly_streams.cc
+++ b/net/dcsctp/rx/traditional_reassembly_streams.cc
@@ -78,6 +78,30 @@
}
} // namespace
+TraditionalReassemblyStreams::TraditionalReassemblyStreams(
+ absl::string_view log_prefix,
+ OnAssembledMessage on_assembled_message,
+ const DcSctpSocketHandoverState* handover_state)
+ : log_prefix_(log_prefix),
+ on_assembled_message_(std::move(on_assembled_message)) {
+ if (handover_state) {
+ for (const DcSctpSocketHandoverState::OrderedStream& state_stream :
+ handover_state->rx.ordered_streams) {
+ ordered_streams_.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(StreamID(state_stream.id)),
+ std::forward_as_tuple(this, SSN(state_stream.next_ssn)));
+ }
+ for (const DcSctpSocketHandoverState::UnorderedStream& state_stream :
+ handover_state->rx.unordered_streams) {
+ unordered_streams_.emplace(
+ std::piecewise_construct,
+ std::forward_as_tuple(StreamID(state_stream.id)),
+ std::forward_as_tuple(this));
+ }
+ }
+}
+
int TraditionalReassemblyStreams::UnorderedStream::Add(UnwrappedTSN tsn,
Data data) {
int queued_bytes = data.size();
@@ -286,4 +310,39 @@
}
}
}
+
+HandoverReadinessStatus TraditionalReassemblyStreams::GetHandoverReadiness()
+ const {
+ HandoverReadinessStatus status;
+ for (const auto& entry : ordered_streams_) {
+ if (entry.second.has_unassembled_chunks()) {
+ status.Add(HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks);
+ break;
+ }
+ }
+ for (const auto& entry : unordered_streams_) {
+ if (entry.second.has_unassembled_chunks()) {
+ status.Add(
+ HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks);
+ break;
+ }
+ }
+ return status;
+}
+
+void TraditionalReassemblyStreams::AddHandoverState(
+ DcSctpSocketHandoverState& state) {
+ for (const auto& entry : ordered_streams_) {
+ DcSctpSocketHandoverState::OrderedStream state_stream;
+ state_stream.id = entry.first.value();
+ state_stream.next_ssn = entry.second.next_ssn().value();
+ state.rx.ordered_streams.push_back(std::move(state_stream));
+ }
+ for (const auto& entry : unordered_streams_) {
+ DcSctpSocketHandoverState::UnorderedStream state_stream;
+ state_stream.id = entry.first.value();
+ state.rx.unordered_streams.push_back(std::move(state_stream));
+ }
+}
+
} // namespace dcsctp
diff --git a/net/dcsctp/rx/traditional_reassembly_streams.h b/net/dcsctp/rx/traditional_reassembly_streams.h
index d7ae2dd..0c72432 100644
--- a/net/dcsctp/rx/traditional_reassembly_streams.h
+++ b/net/dcsctp/rx/traditional_reassembly_streams.h
@@ -29,9 +29,10 @@
// RFC4960 is to be followed.
class TraditionalReassemblyStreams : public ReassemblyStreams {
public:
- TraditionalReassemblyStreams(absl::string_view log_prefix,
- OnAssembledMessage on_assembled_message)
- : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {}
+ TraditionalReassemblyStreams(
+ absl::string_view log_prefix,
+ OnAssembledMessage on_assembled_message,
+ const DcSctpSocketHandoverState* handover_state = nullptr);
int Add(UnwrappedTSN tsn, Data data) override;
@@ -42,6 +43,9 @@
void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) override;
+ HandoverReadinessStatus GetHandoverReadiness() const override;
+ void AddHandoverState(DcSctpSocketHandoverState& state) override;
+
private:
using ChunkMap = std::map<UnwrappedTSN, Data>;
@@ -65,6 +69,7 @@
int Add(UnwrappedTSN tsn, Data data);
// Returns the number of bytes removed from the queue.
size_t EraseTo(UnwrappedTSN tsn);
+ bool has_unassembled_chunks() const { return !chunks_.empty(); }
private:
// Given an iterator to any chunk within the map, try to assemble a message
@@ -81,14 +86,17 @@
// messages when possible.
class OrderedStream : StreamBase {
public:
- explicit OrderedStream(TraditionalReassemblyStreams* parent)
- : StreamBase(parent), next_ssn_(ssn_unwrapper_.Unwrap(SSN(0))) {}
+ explicit OrderedStream(TraditionalReassemblyStreams* parent,
+ SSN next_ssn = SSN(0))
+ : StreamBase(parent), next_ssn_(ssn_unwrapper_.Unwrap(next_ssn)) {}
int Add(UnwrappedTSN tsn, Data data);
size_t EraseTo(SSN ssn);
void Reset() {
ssn_unwrapper_.Reset();
next_ssn_ = ssn_unwrapper_.Unwrap(SSN(0));
}
+ SSN next_ssn() const { return next_ssn_.Wrap(); }
+ bool has_unassembled_chunks() const { return !chunks_by_ssn_.empty(); }
private:
// Try to assemble one or several messages in order from the stream.
diff --git a/net/dcsctp/rx/traditional_reassembly_streams_test.cc b/net/dcsctp/rx/traditional_reassembly_streams_test.cc
index 30d29a0..f58bfed 100644
--- a/net/dcsctp/rx/traditional_reassembly_streams_test.cc
+++ b/net/dcsctp/rx/traditional_reassembly_streams_test.cc
@@ -148,5 +148,85 @@
EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u);
}
+TEST_F(TraditionalReassemblyStreamsTest, NoStreamsCanBeHandedOver) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ TraditionalReassemblyStreams streams1("", on_assembled.AsStdFunction());
+ EXPECT_TRUE(streams1.GetHandoverReadiness().IsReady());
+
+ DcSctpSocketHandoverState state;
+ streams1.AddHandoverState(state);
+ TraditionalReassemblyStreams streams2("", on_assembled.AsStdFunction(),
+ &state);
+
+ EXPECT_EQ(streams2.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+ EXPECT_EQ(streams2.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
+ EXPECT_EQ(streams2.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
+ EXPECT_EQ(streams2.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+ OrderedStreamsCanBeHandedOverWhenNoUnassembledChunksExist) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ TraditionalReassemblyStreams streams1("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams1.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
+ EXPECT_EQ(streams1.GetHandoverReadiness(),
+ HandoverReadinessStatus(
+ HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks));
+ EXPECT_EQ(streams1.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
+ EXPECT_EQ(streams1.GetHandoverReadiness(),
+ HandoverReadinessStatus(
+ HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks));
+ EXPECT_EQ(streams1.Add(tsn(3), gen_.Ordered({5, 6})), 2);
+ EXPECT_EQ(streams1.GetHandoverReadiness(),
+ HandoverReadinessStatus(
+ HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks));
+
+ ForwardTsnChunk::SkippedStream skipped[] = {
+ ForwardTsnChunk::SkippedStream(StreamID(1), SSN(0))};
+ EXPECT_EQ(streams1.HandleForwardTsn(tsn(3), skipped), 6u);
+ EXPECT_TRUE(streams1.GetHandoverReadiness().IsReady());
+
+ DcSctpSocketHandoverState state;
+ streams1.AddHandoverState(state);
+ TraditionalReassemblyStreams streams2("", on_assembled.AsStdFunction(),
+ &state);
+ EXPECT_EQ(streams2.Add(tsn(4), gen_.Ordered({7})), 1);
+}
+
+TEST_F(TraditionalReassemblyStreamsTest,
+ UnorderedStreamsCanBeHandedOverWhenNoUnassembledChunksExist) {
+ NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
+
+ TraditionalReassemblyStreams streams1("", on_assembled.AsStdFunction());
+
+ EXPECT_EQ(streams1.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
+ EXPECT_EQ(
+ streams1.GetHandoverReadiness(),
+ HandoverReadinessStatus(
+ HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
+ EXPECT_EQ(streams1.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
+ EXPECT_EQ(
+ streams1.GetHandoverReadiness(),
+ HandoverReadinessStatus(
+ HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
+ EXPECT_EQ(streams1.Add(tsn(3), gen_.Unordered({5, 6})), 2);
+ EXPECT_EQ(
+ streams1.GetHandoverReadiness(),
+ HandoverReadinessStatus(
+ HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
+
+ EXPECT_EQ(streams1.HandleForwardTsn(tsn(3), {}), 6u);
+ EXPECT_TRUE(streams1.GetHandoverReadiness().IsReady());
+
+ DcSctpSocketHandoverState state;
+ streams1.AddHandoverState(state);
+ TraditionalReassemblyStreams streams2("", on_assembled.AsStdFunction(),
+ &state);
+ EXPECT_EQ(streams2.Add(tsn(4), gen_.Unordered({7})), 1);
+}
+
} // namespace
} // namespace dcsctp