dcsctp: Reset synchronously with incoming request
When a sender has requested a stream to be reset, and the last sender
assigned TSN hasn't been received yet, the receiver will enter deferred
reset mode, where it will store any data chunks received after that
given TSN, and replay those later, when the stream has been reset.
Before this CL, leaving deferred mode was done as soon as the sender's
last assigned TSN was received. That's actually not how the RFC
describes the process[1], but was done that way to properly handle some
sequences of RE-CONFIG and FORWARD-TSN. But after having read the RFCs
again, and realizing that whenever RFC6525 mention "any data arriving",
this also applies to any FORWARD-TSN[2] - it's better to reset streams
synchronously with the incoming requests, and defer not just DATA past
the sender last assigned TSN, but also any FORWARD-TSN after that TSN.
This mostly simplifies the code and is mostly a refactoring, but most
importantly aligns it with how the resetting procedure is explained in
the RFC. It also fixes two bugs:
* It defers FORWARD-TSN *as well as* DATA chunks with a TSN later
than the sender's last assigned TSN - see test case. The old
implementation tried to handle that by exiting the deferred reset
processing as soon as it reached the sender's last assigned TSN, but
it didn't manage to do that in all cases.
* It only defers DATA chunks for streams that are to be reset, not
all DATA chunks with a TSN > sender's last assigned TSN. This was
missed in the old implementation, but as it's now implemented
strictly according to the RFC, this was now done.
[1] https://datatracker.ietf.org/doc/html/rfc6525#section-5.2.2
[2] RFC6525 cover stream resetting, and RFC3758 cover FORWARD-TSN, and
the combination of these is not covered in the RFCs.
Bug: webrtc:14600
Change-Id: Ief878b755291b9c923aa6fb4317b0f5c00231df4
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/322623
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40889}
diff --git a/net/dcsctp/rx/BUILD.gn b/net/dcsctp/rx/BUILD.gn
index d66fd6b..f5f5b7e 100644
--- a/net/dcsctp/rx/BUILD.gn
+++ b/net/dcsctp/rx/BUILD.gn
@@ -95,6 +95,7 @@
"../../../api:array_view",
"../../../rtc_base:checks",
"../../../rtc_base:logging",
+ "../../../rtc_base/containers:flat_set",
"../common:internal_types",
"../common:sequence_numbers",
"../common:str_join",
@@ -109,6 +110,7 @@
"reassembly_queue.h",
]
absl_deps = [
+ "//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
]
diff --git a/net/dcsctp/rx/data_tracker.h b/net/dcsctp/rx/data_tracker.h
index 62a1232..9991ee6 100644
--- a/net/dcsctp/rx/data_tracker.h
+++ b/net/dcsctp/rx/data_tracker.h
@@ -93,6 +93,10 @@
return TSN(last_cumulative_acked_tsn_.Wrap());
}
+ bool IsLaterThanCumulativeAckedTsn(TSN tsn) const {
+ return tsn_unwrapper_.PeekUnwrap(tsn) > last_cumulative_acked_tsn_;
+ }
+
// Returns true if the received `tsn` would increase the cumulative ack TSN.
bool will_increase_cum_ack_tsn(TSN tsn) const;
diff --git a/net/dcsctp/rx/reassembly_queue.cc b/net/dcsctp/rx/reassembly_queue.cc
index 0b0d8e7..5734436 100644
--- a/net/dcsctp/rx/reassembly_queue.cc
+++ b/net/dcsctp/rx/reassembly_queue.cc
@@ -29,6 +29,7 @@
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/public/types.h"
#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
#include "net/dcsctp/rx/reassembly_streams.h"
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
@@ -68,7 +69,6 @@
use_message_interleaving)) {}
void ReassemblyQueue::Add(TSN tsn, Data data) {
- RTC_DCHECK(IsConsistent());
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
<< ", stream=" << *data.stream_id << ":" << *data.mid
<< ":" << *data.fsn << ", type="
@@ -83,21 +83,23 @@
// the future, the socket is in "deferred reset processing" mode and must
// buffer chunks until it's exited.
if (deferred_reset_streams_.has_value() &&
- unwrapped_tsn >
- tsn_unwrapper_.Unwrap(
- deferred_reset_streams_->req.sender_last_assigned_tsn())) {
+ unwrapped_tsn > deferred_reset_streams_->sender_last_assigned_tsn &&
+ deferred_reset_streams_->streams.contains(data.stream_id)) {
RTC_DLOG(LS_VERBOSE)
<< log_prefix_ << "Deferring chunk with tsn=" << *tsn
- << " until cum_ack_tsn="
- << *deferred_reset_streams_->req.sender_last_assigned_tsn();
+ << ", sid=" << *data.stream_id << " until tsn="
+ << *deferred_reset_streams_->sender_last_assigned_tsn.Wrap();
// https://tools.ietf.org/html/rfc6525#section-5.2.2
// "In this mode, any data arriving with a TSN larger than the
// Sender's Last Assigned TSN for the affected stream(s) MUST be queued
// locally and held until the cumulative acknowledgment point reaches the
// Sender's Last Assigned TSN."
queued_bytes_ += data.size();
- deferred_reset_streams_->deferred_chunks.emplace_back(
- std::make_pair(tsn, std::move(data)));
+ deferred_reset_streams_->deferred_actions.push_back(
+ [this, tsn, data = std::move(data)]() mutable {
+ queued_bytes_ -= data.size();
+ Add(tsn, std::move(data));
+ });
} else {
queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
}
@@ -113,83 +115,51 @@
RTC_DCHECK(IsConsistent());
}
-ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams(
- const OutgoingSSNResetRequestParameter& req,
- TSN cum_tsn_ack) {
- RTC_DCHECK(IsConsistent());
- if (deferred_reset_streams_.has_value()) {
- // In deferred mode already.
- return ReconfigurationResponseParameter::Result::kInProgress;
- } else if (req.request_sequence_number() <=
- last_completed_reset_req_seq_nbr_) {
- // Already performed at some time previously.
- return ReconfigurationResponseParameter::Result::kSuccessPerformed;
- }
-
- UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn());
- UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack);
-
- // https://tools.ietf.org/html/rfc6525#section-5.2.2
- // "If the Sender's Last Assigned TSN is greater than the
- // cumulative acknowledgment point, then the endpoint MUST enter "deferred
- // reset processing"."
- if (sla_tsn > unwrapped_cum_tsn_ack) {
- RTC_DLOG(LS_VERBOSE)
- << log_prefix_
- << "Entering deferred reset processing mode until cum_tsn_ack="
- << *req.sender_last_assigned_tsn();
- deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req);
- return ReconfigurationResponseParameter::Result::kInProgress;
- }
+void ReassemblyQueue::ResetStreamsAndLeaveDeferredReset(
+ rtc::ArrayView<const StreamID> stream_ids) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Resetting streams: ["
+ << StrJoin(stream_ids, ",",
+ [](rtc::StringBuilder& sb, StreamID sid) {
+ sb << *sid;
+ })
+ << "]";
// https://tools.ietf.org/html/rfc6525#section-5.2.2
// "... streams MUST be reset to 0 as the next expected SSN."
- streams_->ResetStreams(req.stream_ids());
- last_completed_reset_req_seq_nbr_ = req.request_sequence_number();
- RTC_DCHECK(IsConsistent());
- return ReconfigurationResponseParameter::Result::kSuccessPerformed;
-}
+ streams_->ResetStreams(stream_ids);
-bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) {
- RTC_DCHECK(IsConsistent());
if (deferred_reset_streams_.has_value()) {
- UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn);
- UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap(
- deferred_reset_streams_->req.sender_last_assigned_tsn());
- if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) {
- RTC_DLOG(LS_VERBOSE) << log_prefix_
- << "Leaving deferred reset processing with tsn="
- << *cum_ack_tsn << ", feeding back "
- << deferred_reset_streams_->deferred_chunks.size()
- << " chunks";
- // https://tools.ietf.org/html/rfc6525#section-5.2.2
- // "... streams MUST be reset to 0 as the next expected SSN."
- streams_->ResetStreams(deferred_reset_streams_->req.stream_ids());
- std::vector<std::pair<TSN, Data>> deferred_chunks =
- std::move(deferred_reset_streams_->deferred_chunks);
- // The response will not be sent now, but as a reply to the retried
- // request, which will come as "in progress" has been sent prior.
- last_completed_reset_req_seq_nbr_ =
- deferred_reset_streams_->req.request_sequence_number();
- deferred_reset_streams_ = absl::nullopt;
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "Leaving deferred reset processing, feeding back "
+ << deferred_reset_streams_->deferred_actions.size()
+ << " actions";
+ // https://tools.ietf.org/html/rfc6525#section-5.2.2
+ // "Any queued TSNs (queued at step E2) MUST now be released and processed
+ // normally."
+ auto deferred_actions =
+ std::move(deferred_reset_streams_->deferred_actions);
+ deferred_reset_streams_ = absl::nullopt;
- // https://tools.ietf.org/html/rfc6525#section-5.2.2
- // "Any queued TSNs (queued at step E2) MUST now be released and processed
- // normally."
- for (auto& [tsn, data] : deferred_chunks) {
- queued_bytes_ -= data.size();
- Add(tsn, std::move(data));
- }
-
- RTC_DCHECK(IsConsistent());
- return true;
- } else {
- RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn="
- << *cum_ack_tsn;
+ for (auto& action : deferred_actions) {
+ action();
}
}
- return false;
+ RTC_DCHECK(IsConsistent());
+}
+
+void ReassemblyQueue::EnterDeferredReset(
+ TSN sender_last_assigned_tsn,
+ rtc::ArrayView<const StreamID> streams) {
+ if (!deferred_reset_streams_.has_value()) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "Entering deferred reset; sender_last_assigned_tsn="
+ << *sender_last_assigned_tsn;
+ deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(
+ tsn_unwrapper_.Unwrap(sender_last_assigned_tsn),
+ webrtc::flat_set<StreamID>(streams.begin(), streams.end()));
+ }
+ RTC_DCHECK(IsConsistent());
}
std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
@@ -237,18 +207,32 @@
}
}
-void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) {
- RTC_DCHECK(IsConsistent());
- UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn());
+void ReassemblyQueue::HandleForwardTsn(
+ TSN new_cumulative_tsn,
+ rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
+ UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(new_cumulative_tsn);
+ if (deferred_reset_streams_.has_value() &&
+ tsn > deferred_reset_streams_->sender_last_assigned_tsn) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
+ << "- deferring.";
+ deferred_reset_streams_->deferred_actions.emplace_back(
+ [this, new_cumulative_tsn,
+ streams = std::vector<AnyForwardTsnChunk::SkippedStream>(
+ skipped_streams.begin(), skipped_streams.end())] {
+ HandleForwardTsn(new_cumulative_tsn, streams);
+ });
+ RTC_DCHECK(IsConsistent());
+ return;
+ }
+
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "ForwardTSN to " << *tsn.Wrap()
+ << " - performing.";
last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
delivered_tsns_.erase(delivered_tsns_.begin(),
delivered_tsns_.upper_bound(tsn));
-
MaybeMoveLastAssembledWatermarkFurther();
-
- queued_bytes_ -=
- streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams());
+ queued_bytes_ -= streams_->HandleForwardTsn(tsn, skipped_streams);
RTC_DCHECK(IsConsistent());
}
diff --git a/net/dcsctp/rx/reassembly_queue.h b/net/dcsctp/rx/reassembly_queue.h
index e1f231e..761ec35 100644
--- a/net/dcsctp/rx/reassembly_queue.h
+++ b/net/dcsctp/rx/reassembly_queue.h
@@ -19,6 +19,7 @@
#include <utility>
#include <vector>
+#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "api/array_view.h"
#include "net/dcsctp/common/internal_types.h"
@@ -30,6 +31,7 @@
#include "net/dcsctp/public/dcsctp_handover_state.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/rx/reassembly_streams.h"
+#include "rtc_base/containers/flat_set.h"
namespace dcsctp {
@@ -88,18 +90,18 @@
// Handle a ForwardTSN chunk, when the sender has indicated that the received
// (this class) should forget about some chunks. This is used to implement
// partial reliability.
- void Handle(const AnyForwardTsnChunk& forward_tsn);
+ void HandleForwardTsn(
+ TSN new_cumulative_tsn,
+ rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams);
- // Given the reset stream request and the current cum_tsn_ack, might either
- // reset the streams directly (returns kSuccessPerformed), or at a later time,
- // by entering the "deferred reset processing" mode (returns kInProgress).
- ReconfigurationResponseParameter::Result ResetStreams(
- const OutgoingSSNResetRequestParameter& req,
- TSN cum_tsn_ack);
+ // Resets the provided streams and leaves deferred reset processing, if
+ // enabled.
+ void ResetStreamsAndLeaveDeferredReset(
+ rtc::ArrayView<const StreamID> stream_ids);
- // Given the current (updated) cum_tsn_ack, might leave "defererred reset
- // processing" mode and reset streams. Returns true if so.
- bool MaybeResetStreamsDeferred(TSN cum_ack_tsn);
+ // Enters deferred reset processing.
+ void EnterDeferredReset(TSN sender_last_assigned_tsn,
+ rtc::ArrayView<const StreamID> streams);
// The number of payload bytes that have been queued. Note that the actual
// memory usage is higher due to additional overhead of tracking received
@@ -126,18 +128,22 @@
void RestoreFromState(const DcSctpSocketHandoverState& state);
private:
+ struct DeferredResetStreams {
+ DeferredResetStreams(UnwrappedTSN sender_last_assigned_tsn,
+ webrtc::flat_set<StreamID> streams)
+ : sender_last_assigned_tsn(sender_last_assigned_tsn),
+ streams(std::move(streams)) {}
+
+ UnwrappedTSN sender_last_assigned_tsn;
+ webrtc::flat_set<StreamID> streams;
+ std::vector<absl::AnyInvocable<void(void)>> deferred_actions;
+ };
+
bool IsConsistent() const;
void AddReassembledMessage(rtc::ArrayView<const UnwrappedTSN> tsns,
DcSctpMessage message);
void MaybeMoveLastAssembledWatermarkFurther();
- struct DeferredResetStreams {
- explicit DeferredResetStreams(OutgoingSSNResetRequestParameter req)
- : req(std::move(req)) {}
- OutgoingSSNResetRequestParameter req;
- std::vector<std::pair<TSN, Data>> deferred_chunks;
- };
-
const absl::string_view log_prefix_;
const size_t max_size_bytes_;
const size_t watermark_bytes_;
diff --git a/net/dcsctp/rx/reassembly_queue_test.cc b/net/dcsctp/rx/reassembly_queue_test.cc
index 0cd2695..fd8c423 100644
--- a/net/dcsctp/rx/reassembly_queue_test.cc
+++ b/net/dcsctp/rx/reassembly_queue_test.cc
@@ -34,6 +34,7 @@
using ::testing::ElementsAre;
using ::testing::SizeIs;
using ::testing::UnorderedElementsAre;
+using SkippedStream = AnyForwardTsnChunk::SkippedStream;
// The default maximum size of the Reassembly Queue.
static constexpr size_t kBufferSize = 10000;
@@ -194,7 +195,7 @@
EXPECT_FALSE(reasm.HasMessages());
- reasm.Handle(ForwardTsnChunk(TSN(13), {}));
+ reasm.HandleForwardTsn(TSN(13), std::vector<SkippedStream>());
EXPECT_EQ(reasm.queued_bytes(), 3u);
// The second lost chunk comes, message is assembled.
@@ -217,8 +218,8 @@
EXPECT_FALSE(reasm.HasMessages());
- reasm.Handle(ForwardTsnChunk(
- TSN(13), {ForwardTsnChunk::SkippedStream(kStreamID, kSSN)}));
+ reasm.HandleForwardTsn(
+ TSN(13), std::vector<SkippedStream>({SkippedStream(kStreamID, kSSN)}));
EXPECT_EQ(reasm.queued_bytes(), 0u);
// The lost chunk comes, but too late.
@@ -241,8 +242,8 @@
EXPECT_FALSE(reasm.HasMessages());
- reasm.Handle(ForwardTsnChunk(
- TSN(13), {ForwardTsnChunk::SkippedStream(kStreamID, kSSN)}));
+ reasm.HandleForwardTsn(
+ TSN(13), std::vector<SkippedStream>({SkippedStream(kStreamID, kSSN)}));
EXPECT_EQ(reasm.queued_bytes(), 0u);
// The lost chunk comes, but too late.
@@ -274,46 +275,24 @@
.Add(
HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks));
- reasm.Handle(ForwardTsnChunk(TSN(13), {}));
+ reasm.HandleForwardTsn(TSN(13), std::vector<SkippedStream>());
EXPECT_EQ(reasm.GetHandoverReadiness(), HandoverReadinessStatus());
}
TEST_F(ReassemblyQueueTest, NotReadyForHandoverWhenResetStreamIsDeferred) {
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize);
- DataGeneratorOptions opts;
- opts.mid = MID(0);
- reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
- opts.mid = MID(1);
- reasm.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
+ reasm.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)}));
+ reasm.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)}));
EXPECT_THAT(reasm.FlushMessages(), SizeIs(2));
- reasm.ResetStreams(
- OutgoingSSNResetRequestParameter(
- ReconfigRequestSN(10), ReconfigRequestSN(3), TSN(13), {StreamID(1)}),
- TSN(11));
+ reasm.EnterDeferredReset(TSN(12), std::vector<StreamID>({StreamID(1)}));
EXPECT_EQ(reasm.GetHandoverReadiness(),
HandoverReadinessStatus().Add(
HandoverUnreadinessReason::kStreamResetDeferred));
- opts.mid = MID(3);
- opts.ppid = PPID(3);
- reasm.Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
- reasm.MaybeResetStreamsDeferred(TSN(11));
+ reasm.Add(TSN(12), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(2)}));
- opts.mid = MID(2);
- opts.ppid = PPID(2);
- reasm.Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
- reasm.MaybeResetStreamsDeferred(TSN(15));
- EXPECT_EQ(reasm.GetHandoverReadiness(),
- HandoverReadinessStatus().Add(
- HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap));
-
- EXPECT_THAT(reasm.FlushMessages(), SizeIs(2));
- EXPECT_EQ(reasm.GetHandoverReadiness(),
- HandoverReadinessStatus().Add(
- HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap));
-
- reasm.Handle(ForwardTsnChunk(TSN(15), {}));
+ reasm.ResetStreamsAndLeaveDeferredReset(std::vector<StreamID>({StreamID(1)}));
EXPECT_EQ(reasm.GetHandoverReadiness(), HandoverReadinessStatus());
}
@@ -427,9 +406,8 @@
ASSERT_FALSE(reasm.HasMessages());
EXPECT_EQ(reasm.queued_bytes(), 7u);
- reasm.Handle(
- IForwardTsnChunk(TSN(13), {IForwardTsnChunk::SkippedStream(
- IsUnordered(false), kStreamID, MID(0))}));
+ reasm.HandleForwardTsn(TSN(13), std::vector<SkippedStream>({SkippedStream(
+ IsUnordered(false), kStreamID, MID(0))}));
EXPECT_EQ(reasm.queued_bytes(), 0u);
// The lost chunk comes, but too late.
diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc
index 2e29a5a..32bcdaa 100644
--- a/net/dcsctp/socket/dcsctp_socket.cc
+++ b/net/dcsctp/socket/dcsctp_socket.cc
@@ -1102,7 +1102,7 @@
if (tcb_->data_tracker().Observe(tsn, immediate_ack)) {
tcb_->reassembly_queue().Add(tsn, std::move(data));
- MaybeResetStreamsDeferredAndDeliverMessages();
+ MaybeDeliverMessages();
}
}
@@ -1453,12 +1453,7 @@
callbacks_.OnConnected();
}
-void DcSctpSocket::MaybeResetStreamsDeferredAndDeliverMessages() {
- // As new data has been received, see if paused streams can be resumed, which
- // results in even more data added to the reassembly queue.
- tcb_->reassembly_queue().MaybeResetStreamsDeferred(
- tcb_->data_tracker().last_cumulative_acked_tsn());
-
+void DcSctpSocket::MaybeDeliverMessages() {
for (auto& message : tcb_->reassembly_queue().FlushMessages()) {
++metrics_.rx_messages_count;
callbacks_.OnMessageReceived(std::move(message));
@@ -1571,6 +1566,10 @@
// If a response was processed, pending to-be-reset streams may now have
// become unpaused. Try to send more DATA chunks.
tcb_->SendBufferedPackets(now);
+
+ // If it leaves "deferred reset processing", there may be chunks to deliver
+ // that were queued while waiting for the stream to reset.
+ MaybeDeliverMessages();
}
}
@@ -1710,12 +1709,12 @@
return;
}
if (tcb_->data_tracker().HandleForwardTsn(chunk.new_cumulative_tsn())) {
- tcb_->reassembly_queue().Handle(chunk);
+ tcb_->reassembly_queue().HandleForwardTsn(chunk.new_cumulative_tsn(),
+ chunk.skipped_streams());
}
- // A forward TSN - for ordered streams - may allow messages to be
- // delivered.
- MaybeResetStreamsDeferredAndDeliverMessages();
+ // A forward TSN - for ordered streams - may allow messages to be delivered.
+ MaybeDeliverMessages();
}
void DcSctpSocket::MaybeSendShutdownOrAck() {
diff --git a/net/dcsctp/socket/dcsctp_socket.h b/net/dcsctp/socket/dcsctp_socket.h
index 4f7d178..f91eb3e 100644
--- a/net/dcsctp/socket/dcsctp_socket.h
+++ b/net/dcsctp/socket/dcsctp_socket.h
@@ -180,9 +180,8 @@
// sent and prints all chunks.
void DebugPrintOutgoing(rtc::ArrayView<const uint8_t> payload);
// Called whenever data has been received, or the cumulative acknowledgment
- // TSN has moved, that may result in performing deferred stream resetting and
- // delivering messages.
- void MaybeResetStreamsDeferredAndDeliverMessages();
+ // TSN has moved, that may result in delivering messages.
+ void MaybeDeliverMessages();
// Returns true if there is a TCB, and false otherwise (and reports an error).
bool ValidateHasTCB();
diff --git a/net/dcsctp/socket/stream_reset_handler.cc b/net/dcsctp/socket/stream_reset_handler.cc
index f9201ea..2094309 100644
--- a/net/dcsctp/socket/stream_reset_handler.cc
+++ b/net/dcsctp/socket/stream_reset_handler.cc
@@ -131,7 +131,7 @@
}
bool StreamResetHandler::ValidateReqSeqNbr(
- ReconfigRequestSN req_seq_nbr,
+ UnwrappedReconfigRequestSn req_seq_nbr,
std::vector<ReconfigurationResponseParameter>& responses) {
if (req_seq_nbr == last_processed_req_seq_nbr_) {
// https://www.rfc-editor.org/rfc/rfc6525.html#section-5.2.1 "If the
@@ -143,11 +143,11 @@
<< " already processed, returning result="
<< ToString(last_processed_req_result_);
responses.push_back(ReconfigurationResponseParameter(
- req_seq_nbr, last_processed_req_result_));
+ req_seq_nbr.Wrap(), last_processed_req_result_));
return false;
}
- if (req_seq_nbr != ReconfigRequestSN(*last_processed_req_seq_nbr_ + 1)) {
+ if (req_seq_nbr != last_processed_req_seq_nbr_.next_value()) {
// Too old, too new, from wrong association etc.
// This is expected to happen when handing over a RTCPeerConnection from one
// server to another. The client will notice this and may decide to close
@@ -156,7 +156,7 @@
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "req=" << *req_seq_nbr
<< " bad seq_nbr";
responses.push_back(ReconfigurationResponseParameter(
- req_seq_nbr, ResponseResult::kErrorBadSequenceNumber));
+ req_seq_nbr.Wrap(), ResponseResult::kErrorBadSequenceNumber));
return false;
}
@@ -174,16 +174,43 @@
return;
}
- if (ValidateReqSeqNbr(req->request_sequence_number(), responses)) {
- RTC_DLOG(LS_VERBOSE) << log_prefix_
- << "Reset outgoing streams with req_seq_nbr="
- << *req->request_sequence_number();
+ UnwrappedReconfigRequestSn request_sn =
+ incoming_reconfig_request_sn_unwrapper_.Unwrap(
+ req->request_sequence_number());
- last_processed_req_seq_nbr_ = req->request_sequence_number();
- last_processed_req_result_ = reassembly_queue_->ResetStreams(
- *req, data_tracker_->last_cumulative_acked_tsn());
- if (last_processed_req_result_ == ResponseResult::kSuccessPerformed) {
+ if (ValidateReqSeqNbr(request_sn, responses)) {
+ last_processed_req_seq_nbr_ = request_sn;
+ if (data_tracker_->IsLaterThanCumulativeAckedTsn(
+ req->sender_last_assigned_tsn())) {
+ // https://datatracker.ietf.org/doc/html/rfc6525#section-5.2.2
+ // E2) "If the Sender's Last Assigned TSN is greater than the cumulative
+ // acknowledgment point, then the endpoint MUST enter 'deferred reset
+ // processing'."
+ reassembly_queue_->EnterDeferredReset(req->sender_last_assigned_tsn(),
+ req->stream_ids());
+ // "If the endpoint enters 'deferred reset processing', it MUST put a
+ // Re-configuration Response Parameter into a RE-CONFIG chunk indicating
+ // 'In progress' and MUST send the RE-CONFIG chunk.
+ last_processed_req_result_ = ResponseResult::kInProgress;
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "Reset outgoing; Sender last_assigned="
+ << *req->sender_last_assigned_tsn()
+ << " - not yet reached -> InProgress";
+ } else {
+ // https://datatracker.ietf.org/doc/html/rfc6525#section-5.2.2
+ // E3) If no stream numbers are listed in the parameter, then all incoming
+ // streams MUST be reset to 0 as the next expected SSN. If specific stream
+ // numbers are listed, then only these specific streams MUST be reset to
+ // 0, and all other non-listed SSNs remain unchanged. E4: Any queued TSNs
+ // (queued at step E2) MUST now be released and processed normally.
+ reassembly_queue_->ResetStreamsAndLeaveDeferredReset(req->stream_ids());
ctx_->callbacks().OnIncomingStreamsReset(req->stream_ids());
+ last_processed_req_result_ = ResponseResult::kSuccessPerformed;
+
+ RTC_DLOG(LS_VERBOSE) << log_prefix_
+ << "Reset outgoing; Sender last_assigned="
+ << *req->sender_last_assigned_tsn()
+ << " - reached -> SuccessPerformed";
}
responses.push_back(ReconfigurationResponseParameter(
req->request_sequence_number(), last_processed_req_result_));
@@ -200,10 +227,15 @@
"Failed to parse Incoming Reset command");
return;
}
- if (ValidateReqSeqNbr(req->request_sequence_number(), responses)) {
+
+ UnwrappedReconfigRequestSn request_sn =
+ incoming_reconfig_request_sn_unwrapper_.Unwrap(
+ req->request_sequence_number());
+
+ if (ValidateReqSeqNbr(request_sn, responses)) {
responses.push_back(ReconfigurationResponseParameter(
req->request_sequence_number(), ResponseResult::kSuccessNothingToDo));
- last_processed_req_seq_nbr_ = req->request_sequence_number();
+ last_processed_req_seq_nbr_ = request_sn;
}
}
@@ -345,7 +377,8 @@
}
void StreamResetHandler::AddHandoverState(DcSctpSocketHandoverState& state) {
- state.rx.last_completed_reset_req_sn = last_processed_req_seq_nbr_.value();
+ state.rx.last_completed_reset_req_sn =
+ last_processed_req_seq_nbr_.Wrap().value();
state.tx.next_reset_req_sn = next_outgoing_req_seq_nbr_.value();
}
diff --git a/net/dcsctp/socket/stream_reset_handler.h b/net/dcsctp/socket/stream_reset_handler.h
index 8140903..c335130 100644
--- a/net/dcsctp/socket/stream_reset_handler.h
+++ b/net/dcsctp/socket/stream_reset_handler.h
@@ -86,9 +86,11 @@
? ReconfigRequestSN(handover_state->tx.next_reset_req_sn)
: ReconfigRequestSN(*ctx_->my_initial_tsn())),
last_processed_req_seq_nbr_(
- handover_state ? ReconfigRequestSN(
- handover_state->rx.last_completed_reset_req_sn)
- : ReconfigRequestSN(*ctx_->peer_initial_tsn() - 1)),
+ incoming_reconfig_request_sn_unwrapper_.Unwrap(
+ handover_state
+ ? ReconfigRequestSN(
+ handover_state->rx.last_completed_reset_req_sn)
+ : ReconfigRequestSN(*ctx_->peer_initial_tsn() - 1))),
last_processed_req_result_(
ReconfigurationResponseParameter::Result::kSuccessNothingToDo) {}
@@ -113,6 +115,7 @@
void AddHandoverState(DcSctpSocketHandoverState& state);
private:
+ using UnwrappedReconfigRequestSn = UnwrappedSequenceNumber<ReconfigRequestSN>;
// Represents a stream request operation. There can only be one ongoing at
// any time, and a sent request may either succeed, fail or result in the
// receiver signaling that it can't process it right now, and then it will be
@@ -185,7 +188,7 @@
// fails to validate, and returns false, it will also add a response to
// `responses`.
bool ValidateReqSeqNbr(
- ReconfigRequestSN req_seq_nbr,
+ UnwrappedReconfigRequestSn req_seq_nbr,
std::vector<ReconfigurationResponseParameter>& responses);
// Called when this socket receives an outgoing stream reset request. It might
@@ -215,6 +218,7 @@
DataTracker* data_tracker_;
ReassemblyQueue* reassembly_queue_;
RetransmissionQueue* retransmission_queue_;
+ UnwrappedReconfigRequestSn::Unwrapper incoming_reconfig_request_sn_unwrapper_;
const std::unique_ptr<Timer> reconfig_timer_;
// The next sequence number for outgoing stream requests.
@@ -224,7 +228,7 @@
absl::optional<CurrentRequest> current_request_;
// For incoming requests - last processed request sequence number.
- ReconfigRequestSN last_processed_req_seq_nbr_;
+ UnwrappedReconfigRequestSn last_processed_req_seq_nbr_;
// The result from last processed incoming request
ReconfigurationResponseParameter::Result last_processed_req_result_;
};
diff --git a/net/dcsctp/socket/stream_reset_handler_test.cc b/net/dcsctp/socket/stream_reset_handler_test.cc
index 991f182..091d717 100644
--- a/net/dcsctp/socket/stream_reset_handler_test.cc
+++ b/net/dcsctp/socket/stream_reset_handler_test.cc
@@ -20,12 +20,14 @@
#include "api/task_queue/task_queue_base.h"
#include "net/dcsctp/common/handover_testing.h"
#include "net/dcsctp/common/internal_types.h"
+#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/chunk/reconfig_chunk.h"
#include "net/dcsctp/packet/parameter/incoming_ssn_reset_request_parameter.h"
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
#include "net/dcsctp/packet/parameter/parameter.h"
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
#include "net/dcsctp/public/dcsctp_message.h"
+#include "net/dcsctp/public/types.h"
#include "net/dcsctp/rx/data_tracker.h"
#include "net/dcsctp/rx/reassembly_queue.h"
#include "net/dcsctp/socket/mock_context.h"
@@ -42,10 +44,12 @@
namespace {
using ::testing::IsEmpty;
using ::testing::NiceMock;
+using ::testing::Property;
using ::testing::Return;
using ::testing::SizeIs;
using ::testing::UnorderedElementsAre;
using ResponseResult = ReconfigurationResponseParameter::Result;
+using SkippedStream = AnyForwardTsnChunk::SkippedStream;
constexpr TSN kMyInitialTsn = MockContext::MyInitialTsn();
constexpr ReconfigRequestSN kMyInitialReqSn = ReconfigRequestSN(*kMyInitialTsn);
@@ -289,61 +293,187 @@
}
TEST_F(StreamResetHandlerTest, ResetStreamsDeferred) {
- DataGeneratorOptions opts;
- opts.mid = MID(0);
- reasm_->Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE", opts));
+ constexpr StreamID kStreamId = StreamID(1);
+ data_tracker_->Observe(TSN(10));
+ reasm_->Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(0)}));
- opts.mid = MID(1);
- reasm_->Add(AddTo(kPeerInitialTsn, 1),
- gen_.Ordered({1, 2, 3, 4}, "BE", opts));
-
- data_tracker_->Observe(kPeerInitialTsn);
- data_tracker_->Observe(AddTo(kPeerInitialTsn, 1));
- EXPECT_THAT(reasm_->FlushMessages(),
- UnorderedElementsAre(
- SctpMessageIs(StreamID(1), PPID(53), kShortPayload),
- SctpMessageIs(StreamID(1), PPID(53), kShortPayload)));
-
- Parameters::Builder builder;
- builder.Add(OutgoingSSNResetRequestParameter(
- kPeerInitialReqSn, ReconfigRequestSN(3), AddTo(kPeerInitialTsn, 3),
- {StreamID(1)}));
-
- std::vector<ReconfigurationResponseParameter> responses =
- HandleAndCatchResponse(ReConfigChunk(builder.Build()));
- EXPECT_THAT(responses, SizeIs(1));
- EXPECT_EQ(responses[0].result(), ResponseResult::kInProgress);
-
- opts.mid = MID(1);
- opts.ppid = PPID(5);
- reasm_->Add(AddTo(kPeerInitialTsn, 5),
- gen_.Ordered({1, 2, 3, 4}, "BE", opts));
- reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
-
- opts.mid = MID(0);
- opts.ppid = PPID(4);
- reasm_->Add(AddTo(kPeerInitialTsn, 4),
- gen_.Ordered({1, 2, 3, 4}, "BE", opts));
- reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
-
- opts.mid = MID(3);
- opts.ppid = PPID(3);
- reasm_->Add(AddTo(kPeerInitialTsn, 3),
- gen_.Ordered({1, 2, 3, 4}, "BE", opts));
- reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
-
- opts.mid = MID(2);
- opts.ppid = PPID(2);
- reasm_->Add(AddTo(kPeerInitialTsn, 2),
- gen_.Ordered({1, 2, 3, 4}, "BE", opts));
- reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 5));
+ data_tracker_->Observe(TSN(11));
+ reasm_->Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE", {.mid = MID(1)}));
EXPECT_THAT(
reasm_->FlushMessages(),
- UnorderedElementsAre(SctpMessageIs(StreamID(1), PPID(2), kShortPayload),
- SctpMessageIs(StreamID(1), PPID(3), kShortPayload),
- SctpMessageIs(StreamID(1), PPID(4), kShortPayload),
- SctpMessageIs(StreamID(1), PPID(5), kShortPayload)));
+ UnorderedElementsAre(SctpMessageIs(kStreamId, PPID(53), kShortPayload),
+ SctpMessageIs(kStreamId, PPID(53), kShortPayload)));
+
+ Parameters::Builder builder;
+ builder.Add(OutgoingSSNResetRequestParameter(
+ ReconfigRequestSN(10), ReconfigRequestSN(3), TSN(13), {kStreamId}));
+ EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())),
+ ElementsAre(Property(&ReconfigurationResponseParameter::result,
+ ResponseResult::kInProgress)));
+
+ data_tracker_->Observe(TSN(15));
+ reasm_->Add(TSN(15), gen_.Ordered({1, 2, 3, 4}, "BE",
+ {.mid = MID(1), .ppid = PPID(5)}));
+
+ data_tracker_->Observe(TSN(14));
+ reasm_->Add(TSN(14), gen_.Ordered({1, 2, 3, 4}, "BE",
+ {.mid = MID(0), .ppid = PPID(4)}));
+
+ data_tracker_->Observe(TSN(13));
+ reasm_->Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE",
+ {.mid = MID(3), .ppid = PPID(3)}));
+
+ data_tracker_->Observe(TSN(12));
+ reasm_->Add(TSN(12), gen_.Ordered({1, 2, 3, 4}, "BE",
+ {.mid = MID(2), .ppid = PPID(2)}));
+
+ builder.Add(OutgoingSSNResetRequestParameter(
+ ReconfigRequestSN(11), ReconfigRequestSN(4), TSN(13), {kStreamId}));
+ EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())),
+ ElementsAre(Property(&ReconfigurationResponseParameter::result,
+ ResponseResult::kSuccessPerformed)));
+
+ EXPECT_THAT(
+ reasm_->FlushMessages(),
+ UnorderedElementsAre(SctpMessageIs(kStreamId, PPID(2), kShortPayload),
+ SctpMessageIs(kStreamId, PPID(3), kShortPayload),
+ SctpMessageIs(kStreamId, PPID(4), kShortPayload),
+ SctpMessageIs(kStreamId, PPID(5), kShortPayload)));
+}
+
+TEST_F(StreamResetHandlerTest, ResetStreamsDeferredOnlySelectedStreams) {
+ // This test verifies the receiving behavior of receiving messages on
+ // streams 1, 2 and 3, and receiving a reset request on stream 1, 2, causing
+ // deferred reset processing.
+
+ // Reset stream 1,2 with "last assigned TSN=12"
+ Parameters::Builder builder;
+ builder.Add(OutgoingSSNResetRequestParameter(ReconfigRequestSN(10),
+ ReconfigRequestSN(3), TSN(12),
+ {StreamID(1), StreamID(2)}));
+ EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())),
+ ElementsAre(Property(&ReconfigurationResponseParameter::result,
+ ResponseResult::kInProgress)));
+
+ // TSN 10, SID 1 - before TSN 12 -> deliver
+ data_tracker_->Observe(TSN(10));
+ reasm_->Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE",
+ {.stream_id = StreamID(1),
+ .mid = MID(0),
+ .ppid = PPID(1001)}));
+
+ // TSN 11, SID 2 - before TSN 12 -> deliver
+ data_tracker_->Observe(TSN(11));
+ reasm_->Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE",
+ {.stream_id = StreamID(2),
+ .mid = MID(0),
+ .ppid = PPID(1002)}));
+
+ // TSN 12, SID 3 - at TSN 12 -> deliver
+ data_tracker_->Observe(TSN(12));
+ reasm_->Add(TSN(12), gen_.Ordered({1, 2, 3, 4}, "BE",
+ {.stream_id = StreamID(3),
+ .mid = MID(0),
+ .ppid = PPID(1003)}));
+
+ // TSN 13, SID 1 - after TSN 12 and SID=1 -> defer
+ data_tracker_->Observe(TSN(13));
+ reasm_->Add(TSN(13), gen_.Ordered({1, 2, 3, 4}, "BE",
+ {.stream_id = StreamID(1),
+ .mid = MID(0),
+ .ppid = PPID(1004)}));
+
+ // TSN 14, SID 2 - after TSN 12 and SID=2 -> defer
+ data_tracker_->Observe(TSN(14));
+ reasm_->Add(TSN(14), gen_.Ordered({1, 2, 3, 4}, "BE",
+ {.stream_id = StreamID(2),
+ .mid = MID(0),
+ .ppid = PPID(1005)}));
+
+ // TSN 15, SID 3 - after TSN 12, but SID 3 is not reset -> deliver
+ data_tracker_->Observe(TSN(15));
+ reasm_->Add(TSN(15), gen_.Ordered({1, 2, 3, 4}, "BE",
+ {.stream_id = StreamID(3),
+ .mid = MID(1),
+ .ppid = PPID(1006)}));
+
+ EXPECT_THAT(reasm_->FlushMessages(),
+ UnorderedElementsAre(
+ SctpMessageIs(StreamID(1), PPID(1001), kShortPayload),
+ SctpMessageIs(StreamID(2), PPID(1002), kShortPayload),
+ SctpMessageIs(StreamID(3), PPID(1003), kShortPayload),
+ SctpMessageIs(StreamID(3), PPID(1006), kShortPayload)));
+
+ builder.Add(OutgoingSSNResetRequestParameter(ReconfigRequestSN(11),
+ ReconfigRequestSN(3), TSN(13),
+ {StreamID(1), StreamID(2)}));
+ EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())),
+ ElementsAre(Property(&ReconfigurationResponseParameter::result,
+ ResponseResult::kSuccessPerformed)));
+
+ EXPECT_THAT(reasm_->FlushMessages(),
+ UnorderedElementsAre(
+ SctpMessageIs(StreamID(1), PPID(1004), kShortPayload),
+ SctpMessageIs(StreamID(2), PPID(1005), kShortPayload)));
+}
+
+TEST_F(StreamResetHandlerTest, ResetStreamsDefersForwardTsn) {
+ // This test verifies that FORWARD-TSNs are deferred if they want to move
+ // the cumulative ack TSN point past sender's last assigned TSN.
+ static constexpr StreamID kStreamId = StreamID(42);
+
+ // Simulate sender sends:
+ // * TSN 10 (SSN=0, BE, lost),
+ // * TSN 11 (SSN=1, BE, lost),
+ // * TSN 12 (SSN=2, BE, lost)
+ // * RESET THE STREAM
+ // * TSN 13 (SSN=0, B, received)
+ // * TSN 14 (SSN=0, E, lost),
+ // * TSN 15 (SSN=1, BE, received)
+ Parameters::Builder builder;
+ builder.Add(OutgoingSSNResetRequestParameter(
+ ReconfigRequestSN(10), ReconfigRequestSN(3), TSN(12), {kStreamId}));
+ EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())),
+ ElementsAre(Property(&ReconfigurationResponseParameter::result,
+ ResponseResult::kInProgress)));
+
+ // TSN 13, B, after TSN=12 -> defer
+ data_tracker_->Observe(TSN(13));
+ reasm_->Add(TSN(13),
+ gen_.Ordered(
+ {1, 2, 3, 4}, "B",
+ {.stream_id = kStreamId, .mid = MID(0), .ppid = PPID(1004)}));
+
+ // TSN 15, BE, after TSN=12 -> defer
+ data_tracker_->Observe(TSN(15));
+ reasm_->Add(TSN(15),
+ gen_.Ordered(
+ {1, 2, 3, 4}, "BE",
+ {.stream_id = kStreamId, .mid = MID(1), .ppid = PPID(1005)}));
+
+ // Time passes, sender decides to send FORWARD-TSN up to the RESET.
+ data_tracker_->HandleForwardTsn(TSN(12));
+ reasm_->HandleForwardTsn(
+ TSN(12), std::vector<SkippedStream>({SkippedStream(kStreamId, SSN(2))}));
+
+ // The receiver sends a SACK in response to that. The stream hasn't been
+ // reset yet, but the sender now decides that TSN=13-14 is to be skipped.
+ // As this has a TSN 14, after TSN=12 -> defer it.
+ data_tracker_->HandleForwardTsn(TSN(14));
+ reasm_->HandleForwardTsn(
+ TSN(14), std::vector<SkippedStream>({SkippedStream(kStreamId, SSN(0))}));
+
+ // Reset the stream -> deferred TSNs should be re-added.
+ builder.Add(OutgoingSSNResetRequestParameter(
+ ReconfigRequestSN(11), ReconfigRequestSN(3), TSN(12), {kStreamId}));
+ EXPECT_THAT(HandleAndCatchResponse(ReConfigChunk(builder.Build())),
+ ElementsAre(Property(&ReconfigurationResponseParameter::result,
+ ResponseResult::kSuccessPerformed)));
+
+ EXPECT_THAT(reasm_->FlushMessages(),
+ UnorderedElementsAre(
+ SctpMessageIs(kStreamId, PPID(1005), kShortPayload)));
}
TEST_F(StreamResetHandlerTest, SendOutgoingRequestDirectly) {
@@ -767,7 +897,6 @@
DataGeneratorOptions opts;
opts.mid = MID(0);
reasm_->Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE", opts));
- reasm_->MaybeResetStreamsDeferred(kPeerInitialTsn);
data_tracker_->Observe(kPeerInitialTsn);
// And emulate that time has passed, and the peer retries the stream reset,