dcsctp: Refactor Send Queue
Make HasDataToSend not mutate any state, and let the Produce method do
all state mutation and possibly indicate if there is nothing that can be
sent. This is helpful preparation for extracting the scheduling part of
the send queue to a separate component.
Bug: webrtc:5696
Change-Id: I132779e77d3ce6a41e5fcf4432140d3728d03cdc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/261945
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37141}
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 3a2166b..6127da4 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -42,34 +42,18 @@
total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
}
-bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) {
+bool RRSendQueue::OutgoingStream::HasDataToSend() const {
if (pause_state_ == PauseState::kPaused ||
pause_state_ == PauseState::kResetting) {
// The stream has paused (and there is no partially sent message).
return false;
}
- while (!items_.empty()) {
- RRSendQueue::OutgoingStream::Item& item = items_.front();
- if (item.message_id.has_value()) {
- // Already partially sent messages can always continue to be sent. This
- // ensures e.g. that paused streams with partially sent messages get to
- // send the partial message in full before resetting.
- return true;
- }
-
- // Message has expired. Remove it and inspect the next one.
- if (item.expires_at <= now) {
- buffered_amount_.Decrease(item.remaining_size);
- total_buffered_amount_.Decrease(item.remaining_size);
- items_.pop_front();
- RTC_DCHECK(IsConsistent());
- continue;
- }
-
- return true;
+ if (items_.empty()) {
+ return false;
}
- return false;
+
+ return true;
}
void RRSendQueue::OutgoingStream::AddHandoverState(
@@ -141,83 +125,95 @@
RTC_DCHECK(IsConsistent());
}
-SendQueue::DataToSend RRSendQueue::OutgoingStream::Produce(TimeMs now,
- size_t max_size) {
- RTC_DCHECK(!items_.empty());
+absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
+ TimeMs now,
+ size_t max_size) {
RTC_DCHECK(pause_state_ != PauseState::kPaused &&
pause_state_ != PauseState::kResetting);
- Item* item = &items_.front();
- DcSctpMessage& message = item->message;
+ while (!items_.empty()) {
+ Item& item = items_.front();
+ DcSctpMessage& message = item.message;
- // Allocate Message ID and SSN when the first fragment is sent.
- if (!item->message_id.has_value()) {
- MID& mid =
- item->send_options.unordered ? next_unordered_mid_ : next_ordered_mid_;
- item->message_id = mid;
- mid = MID(*mid + 1);
- }
- if (!item->send_options.unordered && !item->ssn.has_value()) {
- item->ssn = next_ssn_;
- next_ssn_ = SSN(*next_ssn_ + 1);
- }
+ // Allocate Message ID and SSN when the first fragment is sent.
+ if (!item.message_id.has_value()) {
+ // Oops, this entire message has already expired. Try the next one.
+ if (item.expires_at <= now) {
+ buffered_amount_.Decrease(item.remaining_size);
+ total_buffered_amount_.Decrease(item.remaining_size);
+ items_.pop_front();
+ continue;
+ }
- // Grab the next `max_size` fragment from this message and calculate flags.
- rtc::ArrayView<const uint8_t> chunk_payload =
- item->message.payload().subview(item->remaining_offset, max_size);
- rtc::ArrayView<const uint8_t> message_payload = message.payload();
- Data::IsBeginning is_beginning(chunk_payload.data() ==
- message_payload.data());
- Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) ==
- (message_payload.data() + message_payload.size()));
-
- StreamID stream_id = message.stream_id();
- PPID ppid = message.ppid();
-
- // Zero-copy the payload if the message fits in a single chunk.
- std::vector<uint8_t> payload =
- is_beginning && is_end
- ? std::move(message).ReleasePayload()
- : std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end());
-
- FSN fsn(item->current_fsn);
- item->current_fsn = FSN(*item->current_fsn + 1);
- buffered_amount_.Decrease(payload.size());
- total_buffered_amount_.Decrease(payload.size());
-
- SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)),
- item->message_id.value(), fsn, ppid,
- std::move(payload), is_beginning, is_end,
- item->send_options.unordered));
- if (item->send_options.max_retransmissions.has_value() &&
- *item->send_options.max_retransmissions >=
- std::numeric_limits<MaxRetransmits::UnderlyingType>::min() &&
- *item->send_options.max_retransmissions <=
- std::numeric_limits<MaxRetransmits::UnderlyingType>::max()) {
- chunk.max_retransmissions =
- MaxRetransmits(*item->send_options.max_retransmissions);
- }
- chunk.expires_at = item->expires_at;
-
- if (is_end) {
- // The entire message has been sent, and its last data copied to `chunk`, so
- // it can safely be discarded.
- items_.pop_front();
-
- if (pause_state_ == PauseState::kPending) {
- RTC_DLOG(LS_VERBOSE) << "Pause state on " << *stream_id
- << " is moving from pending to paused";
- pause_state_ = PauseState::kPaused;
+ MID& mid =
+ item.send_options.unordered ? next_unordered_mid_ : next_ordered_mid_;
+ item.message_id = mid;
+ mid = MID(*mid + 1);
}
- } else {
- item->remaining_offset += chunk_payload.size();
- item->remaining_size -= chunk_payload.size();
- RTC_DCHECK(item->remaining_offset + item->remaining_size ==
- item->message.payload().size());
- RTC_DCHECK(item->remaining_size > 0);
+ if (!item.send_options.unordered && !item.ssn.has_value()) {
+ item.ssn = next_ssn_;
+ next_ssn_ = SSN(*next_ssn_ + 1);
+ }
+
+ // Grab the next `max_size` fragment from this message and calculate flags.
+ rtc::ArrayView<const uint8_t> chunk_payload =
+ item.message.payload().subview(item.remaining_offset, max_size);
+ rtc::ArrayView<const uint8_t> message_payload = message.payload();
+ Data::IsBeginning is_beginning(chunk_payload.data() ==
+ message_payload.data());
+ Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) ==
+ (message_payload.data() + message_payload.size()));
+
+ StreamID stream_id = message.stream_id();
+ PPID ppid = message.ppid();
+
+ // Zero-copy the payload if the message fits in a single chunk.
+ std::vector<uint8_t> payload =
+ is_beginning && is_end
+ ? std::move(message).ReleasePayload()
+ : std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end());
+
+ FSN fsn(item.current_fsn);
+ item.current_fsn = FSN(*item.current_fsn + 1);
+ buffered_amount_.Decrease(payload.size());
+ total_buffered_amount_.Decrease(payload.size());
+
+ SendQueue::DataToSend chunk(Data(stream_id, item.ssn.value_or(SSN(0)),
+ item.message_id.value(), fsn, ppid,
+ std::move(payload), is_beginning, is_end,
+ item.send_options.unordered));
+ if (item.send_options.max_retransmissions.has_value() &&
+ *item.send_options.max_retransmissions >=
+ std::numeric_limits<MaxRetransmits::UnderlyingType>::min() &&
+ *item.send_options.max_retransmissions <=
+ std::numeric_limits<MaxRetransmits::UnderlyingType>::max()) {
+ chunk.max_retransmissions =
+ MaxRetransmits(*item.send_options.max_retransmissions);
+ }
+ chunk.expires_at = item.expires_at;
+
+ if (is_end) {
+ // The entire message has been sent, and its last data copied to `chunk`,
+ // so it can safely be discarded.
+ items_.pop_front();
+
+ if (pause_state_ == PauseState::kPending) {
+ RTC_DLOG(LS_VERBOSE) << "Pause state on " << *stream_id
+ << " is moving from pending to paused";
+ pause_state_ = PauseState::kPaused;
+ }
+ } else {
+ item.remaining_offset += chunk_payload.size();
+ item.remaining_size -= chunk_payload.size();
+ RTC_DCHECK(item.remaining_offset + item.remaining_size ==
+ item.message.payload().size());
+ RTC_DCHECK(item.remaining_size > 0);
+ }
+ RTC_DCHECK(IsConsistent());
+ return chunk;
}
RTC_DCHECK(IsConsistent());
- return chunk;
+ return absl::nullopt;
}
bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
@@ -355,18 +351,18 @@
}
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
-RRSendQueue::GetNextStream(TimeMs now) {
+RRSendQueue::GetNextStream() {
auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
for (auto it = start_it; it != streams_.end(); ++it) {
- if (it->second.HasDataToSend(now)) {
+ if (it->second.HasDataToSend()) {
current_stream_id_ = it->first;
return it;
}
}
for (auto it = streams_.begin(); it != start_it; ++it) {
- if (it->second.HasDataToSend(now)) {
+ if (it->second.HasDataToSend()) {
current_stream_id_ = it->first;
return it;
}
@@ -378,39 +374,43 @@
size_t max_size) {
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
- if (previous_message_has_ended_) {
- // Previous message has ended. Round-robin to a different stream, if there
- // even is one with data to send.
- stream_it = GetNextStream(now);
- if (stream_it == streams_.end()) {
- RTC_DLOG(LS_VERBOSE)
- << log_prefix_
- << "There is no stream with data; Can't produce any data.";
- return absl::nullopt;
+ for (;;) {
+ if (previous_message_has_ended_) {
+ // Previous message has ended. Round-robin to a different stream, if there
+ // even is one with data to send.
+ stream_it = GetNextStream();
+ if (stream_it == streams_.end()) {
+ RTC_DLOG(LS_VERBOSE)
+ << log_prefix_
+ << "There is no stream with data; Can't produce any data.";
+ return absl::nullopt;
+ }
+ } else {
+ // The previous message has not ended; Continue from the current stream.
+ stream_it = streams_.find(current_stream_id_);
+ RTC_DCHECK(stream_it != streams_.end());
}
- } else {
- // The previous message has not ended; Continue from the current stream.
- stream_it = streams_.find(current_stream_id_);
- RTC_DCHECK(stream_it != streams_.end());
+
+ absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
+ if (!data.has_value()) {
+ continue;
+ }
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
+ << (data->data.is_unordered ? "unordered" : "ordered")
+ << "::"
+ << (*data->data.is_beginning && *data->data.is_end
+ ? "complete"
+ : *data->data.is_beginning ? "first"
+ : *data->data.is_end ? "last"
+ : "middle")
+ << ", stream_id=" << *stream_it->first
+ << ", ppid=" << *data->data.ppid
+ << ", length=" << data->data.payload.size();
+
+ previous_message_has_ended_ = *data->data.is_end;
+ RTC_DCHECK(IsConsistent());
+ return data;
}
-
- DataToSend data = stream_it->second.Produce(now, max_size);
- RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
- << (data.data.is_unordered ? "unordered" : "ordered")
- << "::"
- << (*data.data.is_beginning && *data.data.is_end
- ? "complete"
- : *data.data.is_beginning
- ? "first"
- : *data.data.is_end ? "last" : "middle")
- << ", stream_id=" << *stream_it->first
- << ", ppid=" << *data.data.ppid
- << ", length=" << data.data.payload.size();
-
- previous_message_has_ended_ = *data.data.is_end;
-
- RTC_DCHECK(IsConsistent());
- return data;
}
bool RRSendQueue::Discard(IsUnordered unordered,
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index 7ddb426..59f0d91 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -134,9 +134,9 @@
TimeMs expires_at,
const SendOptions& send_options);
- // Produces a data chunk to send. This is only called on streams that have
- // data available.
- DataToSend Produce(TimeMs now, size_t max_size);
+ // Produces a data chunk to send, or `absl::nullopt` if nothing could be
+ // produced, e.g. if all messages have expired.
+ absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
@@ -167,9 +167,9 @@
// Indicates if this stream has a partially sent message in it.
bool has_partially_sent_message() const;
- // Indicates if the stream has data to send. It will also try to remove any
- // expired non-partially sent message.
- bool HasDataToSend(TimeMs now);
+ // Indicates if the stream possibly has data to send. Note that it may
+ // return `true` for streams that have enqueued, but expired, messages.
+ bool HasDataToSend() const;
void set_priority(StreamPriority priority) { priority_ = priority; }
StreamPriority priority() const { return priority_; }
@@ -252,7 +252,7 @@
size_t max_size);
// Return the next stream, in round-robin fashion.
- std::map<StreamID, OutgoingStream>::iterator GetNextStream(TimeMs now);
+ std::map<StreamID, OutgoingStream>::iterator GetNextStream();
const std::string log_prefix_;
const size_t buffer_size_;