/*
 *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */
#ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_
#define NET_DCSCTP_TX_RR_SEND_QUEUE_H_

#include <cstdint>
#include <deque>
#include <map>
#include <string>
#include <utility>
#include <vector>

#include "absl/algorithm/container.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "net/dcsctp/public/dcsctp_socket.h"
#include "net/dcsctp/public/types.h"
#include "net/dcsctp/tx/send_queue.h"

namespace dcsctp {

// The Round Robin SendQueue holds all messages that the client wants to send,
// but that haven't yet been split into chunks and fully sent on the wire.
//
// As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2,
// it will cycle to send messages from different streams. It will send all
// fragments from one message before continuing with a different message on
// possibly a different stream, until support for message interleaving has been
// implemented.
//
// As messages can be (requested to be) sent before the connection is properly
// established, this send queue is always present - even for closed connections.
class RRSendQueue : public SendQueue {
 public:
  RRSendQueue(absl::string_view log_prefix,
              size_t buffer_size,
              StreamPriority default_priority,
              std::function<void(StreamID)> on_buffered_amount_low,
              size_t total_buffered_amount_low_threshold,
              std::function<void()> on_total_buffered_amount_low);

  // Indicates if the buffer is full. Note that it's up to the caller to ensure
  // that the buffer is not full prior to adding new items to it.
  bool IsFull() const;
  // Indicates if the buffer is empty.
  bool IsEmpty() const;

  // Adds the message to be sent using the `send_options` provided. The current
  // time should be in `now`. Note that it's the responsibility of the caller to
  // ensure that the buffer is not full (by calling `IsFull`) before adding
  // messages to it.
  void Add(TimeMs now,
           DcSctpMessage message,
           const SendOptions& send_options = {});

  // Implementation of `SendQueue`.
  absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override;
  bool Discard(IsUnordered unordered,
               StreamID stream_id,
               MID message_id) override;
  void PrepareResetStream(StreamID streams) override;
  bool HasStreamsReadyToBeReset() const override;
  std::vector<StreamID> GetStreamsReadyToBeReset() override;
  void CommitResetStreams() override;
  void RollbackResetStreams() override;
  void Reset() override;
  size_t buffered_amount(StreamID stream_id) const override;
  size_t total_buffered_amount() const override {
    return total_buffered_amount_.value();
  }
  size_t buffered_amount_low_threshold(StreamID stream_id) const override;
  void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;

  void SetStreamPriority(StreamID stream_id, StreamPriority priority);
  StreamPriority GetStreamPriority(StreamID stream_id) const;
  HandoverReadinessStatus GetHandoverReadiness() const;
  void AddHandoverState(DcSctpSocketHandoverState& state);
  void RestoreFromState(const DcSctpSocketHandoverState& state);

 private:
  // Represents a value and a "low threshold" that when the value reaches or
  // goes under the "low threshold", will trigger `on_threshold_reached`
  // callback.
  class ThresholdWatcher {
   public:
    explicit ThresholdWatcher(std::function<void()> on_threshold_reached)
        : on_threshold_reached_(std::move(on_threshold_reached)) {}
    // Increases the value.
    void Increase(size_t bytes) { value_ += bytes; }
    // Decreases the value and triggers `on_threshold_reached` if it's at or
    // below `low_threshold()`.
    void Decrease(size_t bytes);

    size_t value() const { return value_; }
    size_t low_threshold() const { return low_threshold_; }
    void SetLowThreshold(size_t low_threshold);

   private:
    const std::function<void()> on_threshold_reached_;
    size_t value_ = 0;
    size_t low_threshold_ = 0;
  };

  // Per-stream information.
  class OutgoingStream {
   public:
    OutgoingStream(
        StreamID stream_id,
        StreamPriority priority,
        std::function<void()> on_buffered_amount_low,
        ThresholdWatcher& total_buffered_amount,
        const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
        : stream_id_(stream_id),
          priority_(priority),
          next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
          next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
          next_ssn_(SSN(state ? state->next_ssn : 0)),
          buffered_amount_(std::move(on_buffered_amount_low)),
          total_buffered_amount_(total_buffered_amount) {}

    StreamID stream_id() const { return stream_id_; }

    // Enqueues a message to this stream.
    void Add(DcSctpMessage message,
             TimeMs expires_at,
             const SendOptions& send_options);

    // Produces a data chunk to send. This is only called on streams that have
    // data available.
    DataToSend Produce(TimeMs now, size_t max_size);

    const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
    ThresholdWatcher& buffered_amount() { return buffered_amount_; }

    // Discards a partially sent message, see `SendQueue::Discard`.
    bool Discard(IsUnordered unordered, MID message_id);

    // Pauses this stream, which is used before resetting it.
    void Pause();

    // Resumes a paused stream.
    void Resume();

    bool IsReadyToBeReset() const {
      return pause_state_ == PauseState::kPaused;
    }

    bool IsResetting() const { return pause_state_ == PauseState::kResetting; }

    void SetAsResetting() {
      RTC_DCHECK(pause_state_ == PauseState::kPaused);
      pause_state_ = PauseState::kResetting;
    }

    // Resets this stream, meaning MIDs and SSNs are set to zero.
    void Reset();

    // Indicates if this stream has a partially sent message in it.
    bool has_partially_sent_message() const;

    // Indicates if the stream has data to send. It will also try to remove any
    // expired non-partially sent message.
    bool HasDataToSend(TimeMs now);

    void set_priority(StreamPriority priority) { priority_ = priority; }
    StreamPriority priority() const { return priority_; }

    void AddHandoverState(
        DcSctpSocketHandoverState::OutgoingStream& state) const;

   private:
    // Streams are paused before they can be reset. To reset a stream, the
    // socket sends an outgoing stream reset command with the TSN of the last
    // fragment of the last message, so that receivers and senders can agree on
    // when it stopped. And if the send queue is in the middle of sending a
    // message, and without fragments not yet sent and without TSNs allocated to
    // them, it will keep sending data until that message has ended.
    enum class PauseState {
      // The stream is not paused, and not scheduled to be reset.
      kNotPaused,
      // The stream has requested to be reset/paused but is still producing
      // fragments of a message that hasn't ended yet. When it does, it will
      // transition to the `kPaused` state.
      kPending,
      // The stream is fully paused and can be reset.
      kPaused,
      // The stream has been added to an outgoing stream reset request and a
      // response from the peer hasn't been received yet.
      kResetting,
    };

    // An enqueued message and metadata.
    struct Item {
      explicit Item(DcSctpMessage msg,
                    TimeMs expires_at,
                    const SendOptions& send_options)
          : message(std::move(msg)),
            expires_at(expires_at),
            send_options(send_options),
            remaining_offset(0),
            remaining_size(message.payload().size()) {}
      DcSctpMessage message;
      TimeMs expires_at;
      SendOptions send_options;
      // The remaining payload (offset and size) to be sent, when it has been
      // fragmented.
      size_t remaining_offset;
      size_t remaining_size;
      // If set, an allocated Message ID and SSN. Will be allocated when the
      // first fragment is sent.
      absl::optional<MID> message_id = absl::nullopt;
      absl::optional<SSN> ssn = absl::nullopt;
      // The current Fragment Sequence Number, incremented for each fragment.
      FSN current_fsn = FSN(0);
    };

    bool IsConsistent() const;

    const StreamID stream_id_;
    StreamPriority priority_;
    PauseState pause_state_ = PauseState::kNotPaused;
    // MIDs are different for unordered and ordered messages sent on a stream.
    MID next_unordered_mid_;
    MID next_ordered_mid_;

    SSN next_ssn_;
    // Enqueued messages, and metadata.
    std::deque<Item> items_;

    // The current amount of buffered data.
    ThresholdWatcher buffered_amount_;

    // Reference to the total buffered amount, which is updated directly by each
    // stream.
    ThresholdWatcher& total_buffered_amount_;
  };

  bool IsConsistent() const;
  OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
  absl::optional<DataToSend> Produce(
      std::map<StreamID, OutgoingStream>::iterator it,
      TimeMs now,
      size_t max_size);

  // Return the next stream, in round-robin fashion.
  std::map<StreamID, OutgoingStream>::iterator GetNextStream(TimeMs now);

  const std::string log_prefix_;
  const size_t buffer_size_;
  const StreamPriority default_priority_;

  // Called when the buffered amount is below what has been set using
  // `SetBufferedAmountLowThreshold`.
  const std::function<void(StreamID)> on_buffered_amount_low_;

  // Called when the total buffered amount is below what has been set using
  // `SetTotalBufferedAmountLowThreshold`.
  const std::function<void()> on_total_buffered_amount_low_;

  // The total amount of buffer data, for all streams.
  ThresholdWatcher total_buffered_amount_;

  // Indicates if the previous fragment sent was the end of a message. For
  // non-interleaved sending, this means that the next message may come from a
  // different stream. If not true, the next fragment must be produced from the
  // same stream as last time.
  bool previous_message_has_ended_ = true;

  // The current stream to send chunks from. Modified by `GetNextStream`.
  StreamID current_stream_id_ = StreamID(0);

  // All streams, and messages added to those.
  std::map<StreamID, OutgoingStream> streams_;
};
}  // namespace dcsctp

#endif  // NET_DCSCTP_TX_RR_SEND_QUEUE_H_
