| /* |
| * Copyright (c) 2021 The WebRTC project authors. All Rights Reserved. |
| * |
| * Use of this source code is governed by a BSD-style license |
| * that can be found in the LICENSE file in the root of the source |
| * tree. An additional intellectual property rights grant can be found |
| * in the file PATENTS. All contributing project authors may |
| * be found in the AUTHORS file in the root of the source tree. |
| */ |
| #ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_ |
| #define NET_DCSCTP_TX_RR_SEND_QUEUE_H_ |
| |
| #include <cstdint> |
| #include <deque> |
| #include <map> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "absl/algorithm/container.h" |
| #include "absl/strings/string_view.h" |
| #include "absl/types/optional.h" |
| #include "api/array_view.h" |
| #include "net/dcsctp/public/dcsctp_message.h" |
| #include "net/dcsctp/public/dcsctp_socket.h" |
| #include "net/dcsctp/public/types.h" |
| #include "net/dcsctp/tx/send_queue.h" |
| |
| namespace dcsctp { |
| |
| // The Round Robin SendQueue holds all messages that the client wants to send, |
| // but that haven't yet been split into chunks and fully sent on the wire. |
| // |
| // As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2, |
| // it will cycle to send messages from different streams. It will send all |
| // fragments from one message before continuing with a different message on |
| // possibly a different stream, until support for message interleaving has been |
| // implemented. |
| // |
| // As messages can be (requested to be) sent before the connection is properly |
| // established, this send queue is always present - even for closed connections. |
| class RRSendQueue : public SendQueue { |
| public: |
| RRSendQueue(absl::string_view log_prefix, |
| size_t buffer_size, |
| StreamPriority default_priority, |
| std::function<void(StreamID)> on_buffered_amount_low, |
| size_t total_buffered_amount_low_threshold, |
| std::function<void()> on_total_buffered_amount_low); |
| |
| // Indicates if the buffer is full. Note that it's up to the caller to ensure |
| // that the buffer is not full prior to adding new items to it. |
| bool IsFull() const; |
| // Indicates if the buffer is empty. |
| bool IsEmpty() const; |
| |
| // Adds the message to be sent using the `send_options` provided. The current |
| // time should be in `now`. Note that it's the responsibility of the caller to |
| // ensure that the buffer is not full (by calling `IsFull`) before adding |
| // messages to it. |
| void Add(TimeMs now, |
| DcSctpMessage message, |
| const SendOptions& send_options = {}); |
| |
| // Implementation of `SendQueue`. |
| absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override; |
| bool Discard(IsUnordered unordered, |
| StreamID stream_id, |
| MID message_id) override; |
| void PrepareResetStream(StreamID streams) override; |
| bool HasStreamsReadyToBeReset() const override; |
| std::vector<StreamID> GetStreamsReadyToBeReset() override; |
| void CommitResetStreams() override; |
| void RollbackResetStreams() override; |
| void Reset() override; |
| size_t buffered_amount(StreamID stream_id) const override; |
| size_t total_buffered_amount() const override { |
| return total_buffered_amount_.value(); |
| } |
| size_t buffered_amount_low_threshold(StreamID stream_id) const override; |
| void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override; |
| |
| void SetStreamPriority(StreamID stream_id, StreamPriority priority); |
| StreamPriority GetStreamPriority(StreamID stream_id) const; |
| HandoverReadinessStatus GetHandoverReadiness() const; |
| void AddHandoverState(DcSctpSocketHandoverState& state); |
| void RestoreFromState(const DcSctpSocketHandoverState& state); |
| |
| private: |
| // Represents a value and a "low threshold" that when the value reaches or |
| // goes under the "low threshold", will trigger `on_threshold_reached` |
| // callback. |
| class ThresholdWatcher { |
| public: |
| explicit ThresholdWatcher(std::function<void()> on_threshold_reached) |
| : on_threshold_reached_(std::move(on_threshold_reached)) {} |
| // Increases the value. |
| void Increase(size_t bytes) { value_ += bytes; } |
| // Decreases the value and triggers `on_threshold_reached` if it's at or |
| // below `low_threshold()`. |
| void Decrease(size_t bytes); |
| |
| size_t value() const { return value_; } |
| size_t low_threshold() const { return low_threshold_; } |
| void SetLowThreshold(size_t low_threshold); |
| |
| private: |
| const std::function<void()> on_threshold_reached_; |
| size_t value_ = 0; |
| size_t low_threshold_ = 0; |
| }; |
| |
| // Per-stream information. |
| class OutgoingStream { |
| public: |
| OutgoingStream( |
| StreamID stream_id, |
| StreamPriority priority, |
| std::function<void()> on_buffered_amount_low, |
| ThresholdWatcher& total_buffered_amount, |
| const DcSctpSocketHandoverState::OutgoingStream* state = nullptr) |
| : stream_id_(stream_id), |
| priority_(priority), |
| next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), |
| next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)), |
| next_ssn_(SSN(state ? state->next_ssn : 0)), |
| buffered_amount_(std::move(on_buffered_amount_low)), |
| total_buffered_amount_(total_buffered_amount) {} |
| |
| StreamID stream_id() const { return stream_id_; } |
| |
| // Enqueues a message to this stream. |
| void Add(DcSctpMessage message, |
| TimeMs expires_at, |
| const SendOptions& send_options); |
| |
| // Produces a data chunk to send, or `absl::nullopt` if nothing could be |
| // produced, e.g. if all messages have expired. |
| absl::optional<DataToSend> Produce(TimeMs now, size_t max_size); |
| |
| const ThresholdWatcher& buffered_amount() const { return buffered_amount_; } |
| ThresholdWatcher& buffered_amount() { return buffered_amount_; } |
| |
| // Discards a partially sent message, see `SendQueue::Discard`. |
| bool Discard(IsUnordered unordered, MID message_id); |
| |
| // Pauses this stream, which is used before resetting it. |
| void Pause(); |
| |
| // Resumes a paused stream. |
| void Resume(); |
| |
| bool IsReadyToBeReset() const { |
| return pause_state_ == PauseState::kPaused; |
| } |
| |
| bool IsResetting() const { return pause_state_ == PauseState::kResetting; } |
| |
| void SetAsResetting() { |
| RTC_DCHECK(pause_state_ == PauseState::kPaused); |
| pause_state_ = PauseState::kResetting; |
| } |
| |
| // Resets this stream, meaning MIDs and SSNs are set to zero. |
| void Reset(); |
| |
| // Indicates if this stream has a partially sent message in it. |
| bool has_partially_sent_message() const; |
| |
| // Indicates if the stream possibly has data to send. Note that it may |
| // return `true` for streams that have enqueued, but expired, messages. |
| bool HasDataToSend() const; |
| |
| void set_priority(StreamPriority priority) { priority_ = priority; } |
| StreamPriority priority() const { return priority_; } |
| |
| void AddHandoverState( |
| DcSctpSocketHandoverState::OutgoingStream& state) const; |
| |
| private: |
| // Streams are paused before they can be reset. To reset a stream, the |
| // socket sends an outgoing stream reset command with the TSN of the last |
| // fragment of the last message, so that receivers and senders can agree on |
| // when it stopped. And if the send queue is in the middle of sending a |
| // message, and without fragments not yet sent and without TSNs allocated to |
| // them, it will keep sending data until that message has ended. |
| enum class PauseState { |
| // The stream is not paused, and not scheduled to be reset. |
| kNotPaused, |
| // The stream has requested to be reset/paused but is still producing |
| // fragments of a message that hasn't ended yet. When it does, it will |
| // transition to the `kPaused` state. |
| kPending, |
| // The stream is fully paused and can be reset. |
| kPaused, |
| // The stream has been added to an outgoing stream reset request and a |
| // response from the peer hasn't been received yet. |
| kResetting, |
| }; |
| |
| // An enqueued message and metadata. |
| struct Item { |
| explicit Item(DcSctpMessage msg, |
| TimeMs expires_at, |
| const SendOptions& send_options) |
| : message(std::move(msg)), |
| expires_at(expires_at), |
| send_options(send_options), |
| remaining_offset(0), |
| remaining_size(message.payload().size()) {} |
| DcSctpMessage message; |
| TimeMs expires_at; |
| SendOptions send_options; |
| // The remaining payload (offset and size) to be sent, when it has been |
| // fragmented. |
| size_t remaining_offset; |
| size_t remaining_size; |
| // If set, an allocated Message ID and SSN. Will be allocated when the |
| // first fragment is sent. |
| absl::optional<MID> message_id = absl::nullopt; |
| absl::optional<SSN> ssn = absl::nullopt; |
| // The current Fragment Sequence Number, incremented for each fragment. |
| FSN current_fsn = FSN(0); |
| }; |
| |
| bool IsConsistent() const; |
| |
| const StreamID stream_id_; |
| StreamPriority priority_; |
| PauseState pause_state_ = PauseState::kNotPaused; |
| // MIDs are different for unordered and ordered messages sent on a stream. |
| MID next_unordered_mid_; |
| MID next_ordered_mid_; |
| |
| SSN next_ssn_; |
| // Enqueued messages, and metadata. |
| std::deque<Item> items_; |
| |
| // The current amount of buffered data. |
| ThresholdWatcher buffered_amount_; |
| |
| // Reference to the total buffered amount, which is updated directly by each |
| // stream. |
| ThresholdWatcher& total_buffered_amount_; |
| }; |
| |
| bool IsConsistent() const; |
| OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id); |
| absl::optional<DataToSend> Produce( |
| std::map<StreamID, OutgoingStream>::iterator it, |
| TimeMs now, |
| size_t max_size); |
| |
| // Return the next stream, in round-robin fashion. |
| std::map<StreamID, OutgoingStream>::iterator GetNextStream(); |
| |
| const std::string log_prefix_; |
| const size_t buffer_size_; |
| const StreamPriority default_priority_; |
| |
| // Called when the buffered amount is below what has been set using |
| // `SetBufferedAmountLowThreshold`. |
| const std::function<void(StreamID)> on_buffered_amount_low_; |
| |
| // Called when the total buffered amount is below what has been set using |
| // `SetTotalBufferedAmountLowThreshold`. |
| const std::function<void()> on_total_buffered_amount_low_; |
| |
| // The total amount of buffer data, for all streams. |
| ThresholdWatcher total_buffered_amount_; |
| |
| // Indicates if the previous fragment sent was the end of a message. For |
| // non-interleaved sending, this means that the next message may come from a |
| // different stream. If not true, the next fragment must be produced from the |
| // same stream as last time. |
| bool previous_message_has_ended_ = true; |
| |
| // The current stream to send chunks from. Modified by `GetNextStream`. |
| StreamID current_stream_id_ = StreamID(0); |
| |
| // All streams, and messages added to those. |
| std::map<StreamID, OutgoingStream> streams_; |
| }; |
| } // namespace dcsctp |
| |
| #endif // NET_DCSCTP_TX_RR_SEND_QUEUE_H_ |