/*
 *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */
#include "net/dcsctp/rx/reassembly_queue.h"

#include <stddef.h>

#include <algorithm>
#include <cstdint>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/common/str_join.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
#include "net/dcsctp/rx/reassembly_streams.h"
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
#include "rtc_base/logging.h"

namespace dcsctp {
namespace {
std::unique_ptr<ReassemblyStreams> CreateStreams(
    absl::string_view log_prefix,
    ReassemblyStreams::OnAssembledMessage on_assembled_message,
    bool use_message_interleaving) {
  if (use_message_interleaving) {
    return std::make_unique<InterleavedReassemblyStreams>(
        log_prefix, std::move(on_assembled_message));
  }
  return std::make_unique<TraditionalReassemblyStreams>(
      log_prefix, std::move(on_assembled_message));
}
}  // namespace

ReassemblyQueue::ReassemblyQueue(absl::string_view log_prefix,
                                 TSN peer_initial_tsn,
                                 size_t max_size_bytes,
                                 bool use_message_interleaving)
    : log_prefix_(log_prefix),
      max_size_bytes_(max_size_bytes),
      watermark_bytes_(max_size_bytes * kHighWatermarkLimit),
      last_assembled_tsn_watermark_(
          tsn_unwrapper_.Unwrap(TSN(*peer_initial_tsn - 1))),
      last_completed_reset_req_seq_nbr_(ReconfigRequestSN(0)),
      streams_(CreateStreams(
          log_prefix_,
          [this](rtc::ArrayView<const UnwrappedTSN> tsns,
                 DcSctpMessage message) {
            AddReassembledMessage(tsns, std::move(message));
          },
          use_message_interleaving)) {}

void ReassemblyQueue::Add(TSN tsn, Data data) {
  RTC_DCHECK(IsConsistent());
  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "added tsn=" << *tsn
                       << ", stream=" << *data.stream_id << ":"
                       << *data.message_id << ":" << *data.fsn << ", type="
                       << (data.is_beginning && data.is_end ? "complete"
                           : data.is_beginning              ? "first"
                           : data.is_end                    ? "last"
                                                            : "middle");

  UnwrappedTSN unwrapped_tsn = tsn_unwrapper_.Unwrap(tsn);

  if (unwrapped_tsn <= last_assembled_tsn_watermark_ ||
      delivered_tsns_.find(unwrapped_tsn) != delivered_tsns_.end()) {
    RTC_DLOG(LS_VERBOSE) << log_prefix_
                         << "Chunk has already been delivered - skipping";
    return;
  }

  // If a stream reset has been received with a "sender's last assigned tsn" in
  // the future, the socket is in "deferred reset processing" mode and must
  // buffer chunks until it's exited.
  if (deferred_reset_streams_.has_value() &&
      unwrapped_tsn >
          tsn_unwrapper_.Unwrap(
              deferred_reset_streams_->req.sender_last_assigned_tsn())) {
    RTC_DLOG(LS_VERBOSE)
        << log_prefix_ << "Deferring chunk with tsn=" << *tsn
        << " until cum_ack_tsn="
        << *deferred_reset_streams_->req.sender_last_assigned_tsn();
    // https://tools.ietf.org/html/rfc6525#section-5.2.2
    // "In this mode, any data arriving with a TSN larger than the
    // Sender's Last Assigned TSN for the affected stream(s) MUST be queued
    // locally and held until the cumulative acknowledgment point reaches the
    // Sender's Last Assigned TSN."
    queued_bytes_ += data.size();
    deferred_reset_streams_->deferred_chunks.emplace_back(
        std::make_pair(tsn, std::move(data)));
  } else {
    queued_bytes_ += streams_->Add(unwrapped_tsn, std::move(data));
  }

  // https://tools.ietf.org/html/rfc4960#section-6.9
  // "Note: If the data receiver runs out of buffer space while still
  // waiting for more fragments to complete the reassembly of the message, it
  // should dispatch part of its inbound message through a partial delivery
  // API (see Section 10), freeing some of its receive buffer space so that
  // the rest of the message may be received."

  // TODO(boivie): Support EOR flag and partial delivery?
  RTC_DCHECK(IsConsistent());
}

ReconfigurationResponseParameter::Result ReassemblyQueue::ResetStreams(
    const OutgoingSSNResetRequestParameter& req,
    TSN cum_tsn_ack) {
  RTC_DCHECK(IsConsistent());
  if (deferred_reset_streams_.has_value()) {
    // In deferred mode already.
    return ReconfigurationResponseParameter::Result::kInProgress;
  } else if (req.request_sequence_number() <=
             last_completed_reset_req_seq_nbr_) {
    // Already performed at some time previously.
    return ReconfigurationResponseParameter::Result::kSuccessPerformed;
  }

  UnwrappedTSN sla_tsn = tsn_unwrapper_.Unwrap(req.sender_last_assigned_tsn());
  UnwrappedTSN unwrapped_cum_tsn_ack = tsn_unwrapper_.Unwrap(cum_tsn_ack);

  // https://tools.ietf.org/html/rfc6525#section-5.2.2
  // "If the Sender's Last Assigned TSN is greater than the
  // cumulative acknowledgment point, then the endpoint MUST enter "deferred
  // reset processing"."
  if (sla_tsn > unwrapped_cum_tsn_ack) {
    RTC_DLOG(LS_VERBOSE)
        << log_prefix_
        << "Entering deferred reset processing mode until cum_tsn_ack="
        << *req.sender_last_assigned_tsn();
    deferred_reset_streams_ = absl::make_optional<DeferredResetStreams>(req);
    return ReconfigurationResponseParameter::Result::kInProgress;
  }

  // https://tools.ietf.org/html/rfc6525#section-5.2.2
  // "... streams MUST be reset to 0 as the next expected SSN."
  streams_->ResetStreams(req.stream_ids());
  last_completed_reset_req_seq_nbr_ = req.request_sequence_number();
  RTC_DCHECK(IsConsistent());
  return ReconfigurationResponseParameter::Result::kSuccessPerformed;
}

bool ReassemblyQueue::MaybeResetStreamsDeferred(TSN cum_ack_tsn) {
  RTC_DCHECK(IsConsistent());
  if (deferred_reset_streams_.has_value()) {
    UnwrappedTSN unwrapped_cum_ack_tsn = tsn_unwrapper_.Unwrap(cum_ack_tsn);
    UnwrappedTSN unwrapped_sla_tsn = tsn_unwrapper_.Unwrap(
        deferred_reset_streams_->req.sender_last_assigned_tsn());
    if (unwrapped_cum_ack_tsn >= unwrapped_sla_tsn) {
      RTC_DLOG(LS_VERBOSE) << log_prefix_
                           << "Leaving deferred reset processing with tsn="
                           << *cum_ack_tsn << ", feeding back "
                           << deferred_reset_streams_->deferred_chunks.size()
                           << " chunks";
      // https://tools.ietf.org/html/rfc6525#section-5.2.2
      // "... streams MUST be reset to 0 as the next expected SSN."
      streams_->ResetStreams(deferred_reset_streams_->req.stream_ids());
      std::vector<std::pair<TSN, Data>> deferred_chunks =
          std::move(deferred_reset_streams_->deferred_chunks);
      // The response will not be sent now, but as a reply to the retried
      // request, which will come as "in progress" has been sent prior.
      last_completed_reset_req_seq_nbr_ =
          deferred_reset_streams_->req.request_sequence_number();
      deferred_reset_streams_ = absl::nullopt;

      // https://tools.ietf.org/html/rfc6525#section-5.2.2
      // "Any queued TSNs (queued at step E2) MUST now be released and processed
      // normally."
      for (auto& [tsn, data] : deferred_chunks) {
        queued_bytes_ -= data.size();
        Add(tsn, std::move(data));
      }

      RTC_DCHECK(IsConsistent());
      return true;
    } else {
      RTC_DLOG(LS_VERBOSE) << "Staying in deferred reset processing. tsn="
                           << *cum_ack_tsn;
    }
  }

  return false;
}

std::vector<DcSctpMessage> ReassemblyQueue::FlushMessages() {
  std::vector<DcSctpMessage> ret;
  reassembled_messages_.swap(ret);
  return ret;
}

void ReassemblyQueue::AddReassembledMessage(
    rtc::ArrayView<const UnwrappedTSN> tsns,
    DcSctpMessage message) {
  RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Assembled message from TSN=["
                       << StrJoin(tsns, ",",
                                  [](rtc::StringBuilder& sb, UnwrappedTSN tsn) {
                                    sb << *tsn.Wrap();
                                  })
                       << "], message; stream_id=" << *message.stream_id()
                       << ", ppid=" << *message.ppid()
                       << ", payload=" << message.payload().size() << " bytes";

  for (const UnwrappedTSN tsn : tsns) {
    if (tsn <= last_assembled_tsn_watermark_) {
      // This can be provoked by a misbehaving peer by sending FORWARD-TSN with
      // invalid SSNs, allowing ordered messages to stay in the queue that
      // should've been discarded.
      RTC_DLOG(LS_VERBOSE)
          << log_prefix_
          << "Message is built from fragments already seen - skipping";
      return;
    } else if (tsn == last_assembled_tsn_watermark_.next_value()) {
      // Update watermark, or insert into delivered_tsns_
      last_assembled_tsn_watermark_.Increment();
    } else {
      delivered_tsns_.insert(tsn);
    }
  }

  // With new TSNs in delivered_tsns, gaps might be filled.
  MaybeMoveLastAssembledWatermarkFurther();

  reassembled_messages_.emplace_back(std::move(message));
}

void ReassemblyQueue::MaybeMoveLastAssembledWatermarkFurther() {
  // `delivered_tsns_` contain TSNS when there is a gap between ranges of
  // assembled TSNs. `last_assembled_tsn_watermark_` should not be adjacent to
  // that list, because if so, it can be moved.
  while (!delivered_tsns_.empty() &&
         *delivered_tsns_.begin() ==
             last_assembled_tsn_watermark_.next_value()) {
    last_assembled_tsn_watermark_.Increment();
    delivered_tsns_.erase(delivered_tsns_.begin());
  }
}

void ReassemblyQueue::Handle(const AnyForwardTsnChunk& forward_tsn) {
  RTC_DCHECK(IsConsistent());
  UnwrappedTSN tsn = tsn_unwrapper_.Unwrap(forward_tsn.new_cumulative_tsn());

  last_assembled_tsn_watermark_ = std::max(last_assembled_tsn_watermark_, tsn);
  delivered_tsns_.erase(delivered_tsns_.begin(),
                        delivered_tsns_.upper_bound(tsn));

  MaybeMoveLastAssembledWatermarkFurther();

  queued_bytes_ -=
      streams_->HandleForwardTsn(tsn, forward_tsn.skipped_streams());
  RTC_DCHECK(IsConsistent());
}

bool ReassemblyQueue::IsConsistent() const {
  // `delivered_tsns_` and `last_assembled_tsn_watermark_` mustn't overlap or be
  // adjacent.
  if (!delivered_tsns_.empty() &&
      last_assembled_tsn_watermark_.next_value() >= *delivered_tsns_.begin()) {
    return false;
  }

  // Allow queued_bytes_ to be larger than max_size_bytes, as it's not actively
  // enforced in this class. This comparison will still trigger if queued_bytes_
  // became "negative".
  return (queued_bytes_ >= 0 && queued_bytes_ <= 2 * max_size_bytes_);
}

HandoverReadinessStatus ReassemblyQueue::GetHandoverReadiness() const {
  HandoverReadinessStatus status = streams_->GetHandoverReadiness();
  if (!delivered_tsns_.empty()) {
    status.Add(HandoverUnreadinessReason::kReassemblyQueueDeliveredTSNsGap);
  }
  if (deferred_reset_streams_.has_value()) {
    status.Add(HandoverUnreadinessReason::kStreamResetDeferred);
  }
  return status;
}

void ReassemblyQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
  state.rx.last_assembled_tsn = last_assembled_tsn_watermark_.Wrap().value();
  state.rx.last_completed_deferred_reset_req_sn =
      last_completed_reset_req_seq_nbr_.value();
  streams_->AddHandoverState(state);
}

void ReassemblyQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
  // Validate that the component is in pristine state.
  RTC_DCHECK(last_completed_reset_req_seq_nbr_ == ReconfigRequestSN(0));

  last_assembled_tsn_watermark_ =
      tsn_unwrapper_.Unwrap(TSN(state.rx.last_assembled_tsn));
  last_completed_reset_req_seq_nbr_ =
      ReconfigRequestSN(state.rx.last_completed_deferred_reset_req_sn);
  streams_->RestoreFromState(state);
}
}  // namespace dcsctp
