dcsctp: Stay in stream if not producing fragment
If there is only little space left in a packet, and the remaining data
for a partially sent message is much larger, it will not generate a
small fragment for this message. This is to avoid fragmenting a message
into too many packets, as that increases the risk of losing messages
when partial reliability is enabled.
And when a stream doesn't want to generate a too small fragment, the
scheduler should _not_ switch streams. It should only switch streams
when a message has been fully sent. Previously, it would switch stream
when a stream doesn't want to produce a message, but as noted above,
that could happen for other reasons.
This required some refactoring, which also increased its robustness by
now only doing explicit stream switching on fully produced messages.
Bug: webrtc:12832
Change-Id: Icb213774fd0d26fba5640b00aac0407d393e4bfc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220937
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34197}
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 37c6974..4bfbaf7 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -27,28 +27,30 @@
namespace dcsctp {
-RRSendQueue::OutgoingStream::Item*
-RRSendQueue::OutgoingStream::GetFirstNonExpiredMessage(TimeMs now) {
+bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) {
while (!items_.empty()) {
RRSendQueue::OutgoingStream::Item& item = items_.front();
- // An entire item can be discarded iff:
- // 1) It hasn't been partially sent (has been allocated a message_id).
- // 2) It has a non-negative expiry time.
- // 3) And that expiry time has passed.
- if (!item.message_id.has_value() && item.expires_at.has_value() &&
- *item.expires_at <= now) {
- // TODO(boivie): This should be reported to the client.
+ if (item.message_id.has_value()) {
+ // Already partially sent messages can always continue to be sent.
+ return true;
+ }
+
+ // Message has expired. Remove it and inspect the next one.
+ if (item.expires_at.has_value() && *item.expires_at <= now) {
buffered_amount_.Decrease(item.remaining_size);
total_buffered_amount_.Decrease(item.remaining_size);
items_.pop_front();
+ RTC_DCHECK(IsConsistent());
continue;
}
- RTC_DCHECK(IsConsistent());
- return &item;
+ if (is_paused_) {
+ // The stream has paused (and there is no partially sent message).
+ return false;
+ }
+ return true;
}
- RTC_DCHECK(IsConsistent());
- return nullptr;
+ return false;
}
bool RRSendQueue::IsConsistent() const {
@@ -56,6 +58,23 @@
for (const auto& stream_entry : streams_) {
total_buffered_amount += stream_entry.second.buffered_amount().value();
}
+
+ if (previous_message_has_ended_) {
+ auto it = streams_.find(current_stream_id_);
+ if (it != streams_.end() && it->second.has_partially_sent_message()) {
+ RTC_DLOG(LS_ERROR)
+ << "Previous message has ended, but still partial message in stream";
+ return false;
+ }
+ } else {
+ auto it = streams_.find(current_stream_id_);
+ if (it == streams_.end() || !it->second.has_partially_sent_message()) {
+ RTC_DLOG(LS_ERROR)
+ << "Previous message has NOT ended, but there is no partial message";
+ return false;
+ }
+ }
+
return total_buffered_amount == total_buffered_amount_.value();
}
@@ -98,19 +117,9 @@
absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
TimeMs now,
size_t max_size) {
- Item* item = GetFirstNonExpiredMessage(now);
- if (item == nullptr) {
- RTC_DCHECK(IsConsistent());
- return absl::nullopt;
- }
+ RTC_DCHECK(!items_.empty());
- // If a stream is paused, it will allow sending all partially sent messages
- // but will not start sending new fragments of completely unsent messages.
- if (is_paused_ && !item->message_id.has_value()) {
- RTC_DCHECK(IsConsistent());
- return absl::nullopt;
- }
-
+ Item* item = &items_.front();
DcSctpMessage& message = item->message;
if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) {
@@ -269,50 +278,79 @@
return total_buffered_amount() == 0;
}
-absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(
- std::map<StreamID, RRSendQueue::OutgoingStream>::iterator it,
- TimeMs now,
- size_t max_size) {
- absl::optional<DataToSend> data = it->second.Produce(now, max_size);
- if (data.has_value()) {
- RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of "
- << data->data.size() << " bytes (max: " << max_size
- << ")";
+std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
+RRSendQueue::GetNextStream(TimeMs now) {
+ auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
- if (data->data.is_end) {
- // No more fragments. Continue with the next stream next time.
- next_stream_id_ = StreamID(*it->first + 1);
- }
- }
- RTC_DCHECK(IsConsistent());
- return data;
-}
-
-absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
- size_t max_size) {
- auto start_it = streams_.lower_bound(next_stream_id_);
for (auto it = start_it; it != streams_.end(); ++it) {
- next_stream_id_ = it->first;
- absl::optional<DataToSend> ret = Produce(it, now, max_size);
- if (ret.has_value()) {
- return ret;
+ if (it->second.HasDataToSend(now)) {
+ current_stream_id_ = it->first;
+ return it;
}
}
for (auto it = streams_.begin(); it != start_it; ++it) {
- next_stream_id_ = it->first;
- absl::optional<DataToSend> ret = Produce(it, now, max_size);
- if (ret.has_value()) {
- return ret;
+ if (it->second.HasDataToSend(now)) {
+ current_stream_id_ = it->first;
+ return it;
}
}
- return absl::nullopt;
+ return streams_.end();
+}
+
+absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
+ size_t max_size) {
+ std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
+
+ if (previous_message_has_ended_) {
+ // Previous message has ended. Round-robin to a different stream, if there
+ // even is one with data to send.
+ stream_it = GetNextStream(now);
+ if (stream_it == streams_.end()) {
+ RTC_DLOG(LS_VERBOSE)
+ << log_prefix_
+ << "There is no stream with data; Can't produce any data.";
+ return absl::nullopt;
+ }
+ } else {
+ // The previous message has not ended; Continue from the current stream.
+ stream_it = streams_.find(current_stream_id_);
+ RTC_DCHECK(stream_it != streams_.end());
+ }
+
+ absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
+ if (data.has_value()) {
+ RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
+ << (data->data.is_unordered ? "unordered" : "ordered")
+ << "::"
+ << (*data->data.is_beginning && *data->data.is_end
+ ? "complete"
+ : *data->data.is_beginning
+ ? "first"
+ : *data->data.is_end ? "last" : "middle")
+ << ", stream_id=" << *stream_it->first
+ << ", ppid=" << *data->data.ppid
+ << ", length=" << data->data.payload.size();
+
+ previous_message_has_ended_ = *data->data.is_end;
+ }
+
+ RTC_DCHECK(IsConsistent());
+ return data;
}
bool RRSendQueue::Discard(IsUnordered unordered,
StreamID stream_id,
MID message_id) {
- return GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
+ bool has_discarded =
+ GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
+ if (has_discarded) {
+ // Only partially sent messages are discarded, so if a message was
+ // discarded, then it was the currently sent message.
+ previous_message_has_ended_ = true;
+ }
+
+ return has_discarded;
}
void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
@@ -353,6 +391,7 @@
OutgoingStream& stream = stream_entry.second;
stream.Reset();
}
+ previous_message_has_ended_ = true;
}
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index bd96bb9..3ec45af 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -147,6 +147,10 @@
// Indicates if this stream has a partially sent message in it.
bool has_partially_sent_message() const;
+ // Indicates if the stream has data to send. It will also try to remove any
+ // expired non-partially sent message.
+ bool HasDataToSend(TimeMs now);
+
private:
// An enqueued message and metadata.
struct Item {
@@ -173,8 +177,6 @@
FSN current_fsn = FSN(0);
};
- // Returns the first non-expired message, or nullptr if there isn't one.
- Item* GetFirstNonExpiredMessage(TimeMs now);
bool IsConsistent() const;
// Streams are pause when they are about to be reset.
@@ -202,6 +204,9 @@
TimeMs now,
size_t max_size);
+ // Return the next stream, in round-robin fashion.
+ std::map<StreamID, OutgoingStream>::iterator GetNextStream(TimeMs now);
+
const std::string log_prefix_;
const size_t buffer_size_;
@@ -216,8 +221,14 @@
// The total amount of buffer data, for all streams.
ThresholdWatcher total_buffered_amount_;
- // The next stream to send chunks from.
- StreamID next_stream_id_ = StreamID(0);
+ // Indicates if the previous fragment sent was the end of a message. For
+ // non-interleaved sending, this means that the next message may come from a
+ // different stream. If not true, the next fragment must be produced from the
+ // same stream as last time.
+ bool previous_message_has_ended_ = true;
+
+ // The current stream to send chunks from. Modified by `GetNextStream`.
+ StreamID current_stream_id_ = StreamID(0);
// All streams, and messages added to those.
std::map<StreamID, OutgoingStream> streams_;
diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc
index 7a03109..682c16a 100644
--- a/net/dcsctp/tx/rr_send_queue_test.cc
+++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -703,5 +703,40 @@
EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
}
+
+TEST_F(RRSendQueueTest, WillStayInStreamWhenOnlySmallFragmentRemaining) {
+ buf_.Add(kNow,
+ DcSctpMessage(StreamID(5), kPPID,
+ std::vector<uint8_t>(kOneFragmentPacketSize * 2)));
+ buf_.Add(kNow, DcSctpMessage(StreamID(6), kPPID, std::vector<uint8_t>(1)));
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk1.data.stream_id, StreamID(5));
+ EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ // Now assume that there will be a lot of previous chunks that need to be
+ // retransmitted, which fills up the next packet and there is little space
+ // left in the packet for new chunks. What it should NOT do right now is to
+ // try to send a message from StreamID 6. And it should not try to send a very
+ // small fragment from StreamID 5 either. So just skip this one.
+ EXPECT_FALSE(buf_.Produce(kNow, 8).has_value());
+
+ // When the next produce request comes with a large buffer to fill, continue
+ // sending from StreamID 5.
+
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk2.data.stream_id, StreamID(5));
+ EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
+
+ // Lastly, produce a message on StreamID 6.
+ ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
+ buf_.Produce(kNow, kOneFragmentPacketSize));
+ EXPECT_EQ(chunk3.data.stream_id, StreamID(6));
+ EXPECT_THAT(chunk3.data.payload, SizeIs(1));
+
+ EXPECT_FALSE(buf_.Produce(kNow, 8).has_value());
+}
} // namespace
} // namespace dcsctp