| /* |
| * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. |
| * |
| * Use of this source code is governed by a BSD-style license |
| * that can be found in the LICENSE file in the root of the source |
| * tree. An additional intellectual property rights grant can be found |
| * in the file PATENTS. All contributing project authors may |
| * be found in the AUTHORS file in the root of the source tree. |
| */ |
| #include "net/dcsctp/rx/interleaved_reassembly_streams.h" |
| |
| #include <stddef.h> |
| |
| #include <cstdint> |
| #include <functional> |
| #include <iterator> |
| #include <map> |
| #include <numeric> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/algorithm/container.h" |
| #include "api/array_view.h" |
| #include "net/dcsctp/common/sequence_numbers.h" |
| #include "net/dcsctp/packet/chunk/forward_tsn_common.h" |
| #include "net/dcsctp/packet/data.h" |
| #include "net/dcsctp/public/types.h" |
| #include "rtc_base/logging.h" |
| |
| namespace dcsctp { |
| |
| InterleavedReassemblyStreams::InterleavedReassemblyStreams( |
| absl::string_view log_prefix, |
| OnAssembledMessage on_assembled_message) |
| : log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {} |
| |
| size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage( |
| UnwrappedMID mid) { |
| std::map<UnwrappedMID, ChunkMap>::iterator it = chunks_by_mid_.find(mid); |
| if (it == chunks_by_mid_.end()) { |
| RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " |
| << *mid.Wrap() << " - no chunks"; |
| return 0; |
| } |
| ChunkMap& chunks = it->second; |
| if (!chunks.begin()->second.second.is_beginning || |
| !chunks.rbegin()->second.second.is_end) { |
| RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " |
| << *mid.Wrap() << "- missing beginning or end"; |
| return 0; |
| } |
| int64_t fsn_diff = *chunks.rbegin()->first - *chunks.begin()->first; |
| if (fsn_diff != (static_cast<int64_t>(chunks.size()) - 1)) { |
| RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " |
| << *mid.Wrap() << "- not all chunks exist (have " |
| << chunks.size() << ", expect " << (fsn_diff + 1) |
| << ")"; |
| return 0; |
| } |
| |
| size_t removed_bytes = AssembleMessage(chunks); |
| RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage " |
| << *mid.Wrap() << " - succeeded and removed " |
| << removed_bytes; |
| |
| chunks_by_mid_.erase(mid); |
| return removed_bytes; |
| } |
| |
| size_t InterleavedReassemblyStreams::Stream::AssembleMessage(UnwrappedTSN tsn, |
| Data data) { |
| size_t payload_size = data.size(); |
| UnwrappedTSN tsns[1] = {tsn}; |
| DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload)); |
| parent_.on_assembled_message_(tsns, std::move(message)); |
| return payload_size; |
| } |
| |
| size_t InterleavedReassemblyStreams::Stream::AssembleMessage( |
| ChunkMap& tsn_chunks) { |
| size_t count = tsn_chunks.size(); |
| if (count == 1) { |
| // Fast path - zero-copy |
| return AssembleMessage(tsn_chunks.begin()->second.first, |
| std::move(tsn_chunks.begin()->second.second)); |
| } |
| |
| // Slow path - will need to concatenate the payload. |
| std::vector<UnwrappedTSN> tsns; |
| tsns.reserve(count); |
| |
| std::vector<uint8_t> payload; |
| size_t payload_size = absl::c_accumulate( |
| tsn_chunks, 0, |
| [](size_t v, const auto& p) { return v + p.second.second.size(); }); |
| payload.reserve(payload_size); |
| |
| for (auto& item : tsn_chunks) { |
| const UnwrappedTSN tsn = item.second.first; |
| const Data& data = item.second.second; |
| tsns.push_back(tsn); |
| payload.insert(payload.end(), data.payload.begin(), data.payload.end()); |
| } |
| |
| const Data& data = tsn_chunks.begin()->second.second; |
| |
| DcSctpMessage message(data.stream_id, data.ppid, std::move(payload)); |
| parent_.on_assembled_message_(tsns, std::move(message)); |
| return payload_size; |
| } |
| |
| size_t InterleavedReassemblyStreams::Stream::EraseTo(MID mid) { |
| UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(mid); |
| |
| size_t removed_bytes = 0; |
| auto it = chunks_by_mid_.begin(); |
| while (it != chunks_by_mid_.end() && it->first <= unwrapped_mid) { |
| removed_bytes += absl::c_accumulate( |
| it->second, 0, |
| [](size_t r2, const auto& q) { return r2 + q.second.second.size(); }); |
| it = chunks_by_mid_.erase(it); |
| } |
| |
| if (!stream_id_.unordered) { |
| // For ordered streams, erasing a message might suddenly unblock that queue |
| // and allow it to deliver any following received messages. |
| if (unwrapped_mid >= next_mid_) { |
| next_mid_ = unwrapped_mid.next_value(); |
| } |
| |
| removed_bytes += TryToAssembleMessages(); |
| } |
| |
| return removed_bytes; |
| } |
| |
| int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) { |
| RTC_DCHECK_EQ(*data.is_unordered, *stream_id_.unordered); |
| RTC_DCHECK_EQ(*data.stream_id, *stream_id_.stream_id); |
| int queued_bytes = data.size(); |
| UnwrappedMID mid = mid_unwrapper_.Unwrap(data.mid); |
| FSN fsn = data.fsn; |
| |
| // Avoid inserting it into any map if it can be delivered directly. |
| if (stream_id_.unordered && data.is_beginning && data.is_end) { |
| AssembleMessage(tsn, std::move(data)); |
| return 0; |
| |
| } else if (!stream_id_.unordered && mid == next_mid_ && data.is_beginning && |
| data.is_end) { |
| AssembleMessage(tsn, std::move(data)); |
| next_mid_.Increment(); |
| // This might unblock assembling more messages. |
| return -TryToAssembleMessages(); |
| } |
| |
| // Slow path. |
| auto [unused, inserted] = |
| chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data))); |
| if (!inserted) { |
| return 0; |
| } |
| |
| if (stream_id_.unordered) { |
| queued_bytes -= TryToAssembleMessage(mid); |
| } else { |
| if (mid == next_mid_) { |
| queued_bytes -= TryToAssembleMessages(); |
| } |
| } |
| |
| return queued_bytes; |
| } |
| |
| size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessages() { |
| size_t removed_bytes = 0; |
| |
| for (;;) { |
| size_t removed_bytes_this_iter = TryToAssembleMessage(next_mid_); |
| if (removed_bytes_this_iter == 0) { |
| break; |
| } |
| |
| removed_bytes += removed_bytes_this_iter; |
| next_mid_.Increment(); |
| } |
| return removed_bytes; |
| } |
| |
| void InterleavedReassemblyStreams::Stream::AddHandoverState( |
| DcSctpSocketHandoverState& state) const { |
| if (stream_id_.unordered) { |
| DcSctpSocketHandoverState::UnorderedStream state_stream; |
| state_stream.id = stream_id_.stream_id.value(); |
| state.rx.unordered_streams.push_back(std::move(state_stream)); |
| } else { |
| DcSctpSocketHandoverState::OrderedStream state_stream; |
| state_stream.id = stream_id_.stream_id.value(); |
| state_stream.next_ssn = next_mid_.Wrap().value(); |
| state.rx.ordered_streams.push_back(std::move(state_stream)); |
| } |
| } |
| |
| InterleavedReassemblyStreams::Stream& |
| InterleavedReassemblyStreams::GetOrCreateStream(const FullStreamId& stream_id) { |
| auto it = streams_.find(stream_id); |
| if (it == streams_.end()) { |
| it = |
| streams_ |
| .emplace(std::piecewise_construct, std::forward_as_tuple(stream_id), |
| std::forward_as_tuple(stream_id, this)) |
| .first; |
| } |
| return it->second; |
| } |
| |
| int InterleavedReassemblyStreams::Add(UnwrappedTSN tsn, Data data) { |
| return GetOrCreateStream(FullStreamId(data.is_unordered, data.stream_id)) |
| .Add(tsn, std::move(data)); |
| } |
| |
| size_t InterleavedReassemblyStreams::HandleForwardTsn( |
| UnwrappedTSN new_cumulative_ack_tsn, |
| rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) { |
| size_t removed_bytes = 0; |
| for (const auto& skipped : skipped_streams) { |
| removed_bytes += |
| GetOrCreateStream(FullStreamId(skipped.unordered, skipped.stream_id)) |
| .EraseTo(skipped.mid); |
| } |
| return removed_bytes; |
| } |
| |
| void InterleavedReassemblyStreams::ResetStreams( |
| rtc::ArrayView<const StreamID> stream_ids) { |
| if (stream_ids.empty()) { |
| for (auto& entry : streams_) { |
| entry.second.Reset(); |
| } |
| } else { |
| for (StreamID stream_id : stream_ids) { |
| GetOrCreateStream(FullStreamId(IsUnordered(true), stream_id)).Reset(); |
| GetOrCreateStream(FullStreamId(IsUnordered(false), stream_id)).Reset(); |
| } |
| } |
| } |
| |
| HandoverReadinessStatus InterleavedReassemblyStreams::GetHandoverReadiness() |
| const { |
| HandoverReadinessStatus status; |
| for (const auto& [stream_id, stream] : streams_) { |
| if (stream.has_unassembled_chunks()) { |
| status.Add( |
| stream_id.unordered |
| ? HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks |
| : HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks); |
| break; |
| } |
| } |
| return status; |
| } |
| |
| void InterleavedReassemblyStreams::AddHandoverState( |
| DcSctpSocketHandoverState& state) { |
| for (const auto& [unused, stream] : streams_) { |
| stream.AddHandoverState(state); |
| } |
| } |
| |
| void InterleavedReassemblyStreams::RestoreFromState( |
| const DcSctpSocketHandoverState& state) { |
| // Validate that the component is in pristine state. |
| RTC_DCHECK(streams_.empty()); |
| |
| for (const DcSctpSocketHandoverState::OrderedStream& state : |
| state.rx.ordered_streams) { |
| FullStreamId stream_id(IsUnordered(false), StreamID(state.id)); |
| streams_.emplace( |
| std::piecewise_construct, std::forward_as_tuple(stream_id), |
| std::forward_as_tuple(stream_id, this, MID(state.next_ssn))); |
| } |
| for (const DcSctpSocketHandoverState::UnorderedStream& state : |
| state.rx.unordered_streams) { |
| FullStreamId stream_id(IsUnordered(true), StreamID(state.id)); |
| streams_.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id), |
| std::forward_as_tuple(stream_id, this)); |
| } |
| } |
| |
| } // namespace dcsctp |