dcsctp: Rename FCFSSendQueue to RRSendQueue
The current send queue implements SCTP_SS_FCFS as defined in
https://datatracker.ietf.org/doc/html/rfc8260#section-3.1, but that has
always been known to be a temporary solution. The end goal is to
implement a Weighted Fair Queueing Scheduler (SCTP_SS_WFQ), but that's
likely to take some time.
Meanwhile, a round robin scheduler (SCTP_SS_RR) will be used to avoid
some issues with the current scheduler, such as a single data channel
completely blocking all others if it sends a lot of messages.
In this first commit, the code has simply been renamed and is still
implementing first-come-first-served. That will be fixed in follow-up
CLS.
Bug: webrtc:12793
Change-Id: Idc03b1594551bfe1ddbe1710872814b9fdf60cc9
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219684
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34090}
diff --git a/net/dcsctp/socket/BUILD.gn b/net/dcsctp/socket/BUILD.gn
index 2fb05ab..58abd7a 100644
--- a/net/dcsctp/socket/BUILD.gn
+++ b/net/dcsctp/socket/BUILD.gn
@@ -133,10 +133,10 @@
"../rx:data_tracker",
"../rx:reassembly_queue",
"../timer",
- "../tx:fcfs_send_queue",
"../tx:retransmission_error_counter",
"../tx:retransmission_queue",
"../tx:retransmission_timeout",
+ "../tx:rr_send_queue",
"../tx:send_queue",
]
sources = [
diff --git a/net/dcsctp/socket/dcsctp_socket.h b/net/dcsctp/socket/dcsctp_socket.h
index 24c0437..96f00d1 100644
--- a/net/dcsctp/socket/dcsctp_socket.h
+++ b/net/dcsctp/socket/dcsctp_socket.h
@@ -49,10 +49,10 @@
#include "net/dcsctp/socket/state_cookie.h"
#include "net/dcsctp/socket/transmission_control_block.h"
#include "net/dcsctp/timer/timer.h"
-#include "net/dcsctp/tx/fcfs_send_queue.h"
#include "net/dcsctp/tx/retransmission_error_counter.h"
#include "net/dcsctp/tx/retransmission_queue.h"
#include "net/dcsctp/tx/retransmission_timeout.h"
+#include "net/dcsctp/tx/rr_send_queue.h"
namespace dcsctp {
@@ -257,7 +257,7 @@
// The actual SendQueue implementation. As data can be sent on a socket before
// the connection is established, this component is not in the TCB.
- FCFSSendQueue send_queue_;
+ RRSendQueue send_queue_;
// Only valid when state == State::kCookieEchoed
// A cached Cookie Echo Chunk, to be re-sent on timer expiry.
diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn
index 924a194..641c8a6 100644
--- a/net/dcsctp/tx/BUILD.gn
+++ b/net/dcsctp/tx/BUILD.gn
@@ -20,7 +20,7 @@
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
}
-rtc_library("fcfs_send_queue") {
+rtc_library("rr_send_queue") {
deps = [
":send_queue",
"../../../api:array_view",
@@ -32,8 +32,8 @@
"../public:types",
]
sources = [
- "fcfs_send_queue.cc",
- "fcfs_send_queue.h",
+ "rr_send_queue.cc",
+ "rr_send_queue.h",
]
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
@@ -111,11 +111,11 @@
testonly = true
deps = [
- ":fcfs_send_queue",
":mock_send_queue",
":retransmission_error_counter",
":retransmission_queue",
":retransmission_timeout",
+ ":rr_send_queue",
":send_queue",
"../../../api:array_view",
"../../../rtc_base:checks",
@@ -131,10 +131,10 @@
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
sources = [
- "fcfs_send_queue_test.cc",
"retransmission_error_counter_test.cc",
"retransmission_queue_test.cc",
"retransmission_timeout_test.cc",
+ "rr_send_queue_test.cc",
]
}
}
diff --git a/net/dcsctp/tx/fcfs_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
similarity index 88%
rename from net/dcsctp/tx/fcfs_send_queue.cc
rename to net/dcsctp/tx/rr_send_queue.cc
index f2dc5e4..f2d22c8 100644
--- a/net/dcsctp/tx/fcfs_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -7,7 +7,7 @@
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
-#include "net/dcsctp/tx/fcfs_send_queue.h"
+#include "net/dcsctp/tx/rr_send_queue.h"
#include <cstdint>
#include <deque>
@@ -26,9 +26,9 @@
#include "rtc_base/logging.h"
namespace dcsctp {
-void FCFSSendQueue::Add(TimeMs now,
- DcSctpMessage message,
- const SendOptions& send_options) {
+void RRSendQueue::Add(TimeMs now,
+ DcSctpMessage message,
+ const SendOptions& send_options) {
RTC_DCHECK(!message.payload().empty());
std::deque<Item>& queue =
IsPaused(message.stream_id()) ? paused_items_ : items_;
@@ -44,7 +44,7 @@
queue.emplace_back(std::move(message), expires_at, send_options);
}
-size_t FCFSSendQueue::total_bytes() const {
+size_t RRSendQueue::total_bytes() const {
// TODO(boivie): Have the current size as a member variable, so that's it not
// calculated for every operation.
return absl::c_accumulate(items_, 0,
@@ -57,17 +57,17 @@
});
}
-bool FCFSSendQueue::IsFull() const {
+bool RRSendQueue::IsFull() const {
return total_bytes() >= buffer_size_;
}
-bool FCFSSendQueue::IsEmpty() const {
+bool RRSendQueue::IsEmpty() const {
return items_.empty();
}
-FCFSSendQueue::Item* FCFSSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
+RRSendQueue::Item* RRSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
while (!items_.empty()) {
- FCFSSendQueue::Item& item = items_.front();
+ RRSendQueue::Item& item = items_.front();
// An entire item can be discarded iff:
// 1) It hasn't been partially sent (has been allocated a message_id).
// 2) It has a non-negative expiry time.
@@ -87,8 +87,8 @@
return nullptr;
}
-absl::optional<SendQueue::DataToSend> FCFSSendQueue::Produce(TimeMs now,
- size_t max_size) {
+absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
+ size_t max_size) {
Item* item = GetFirstNonExpiredMessage(now);
if (item == nullptr) {
return absl::nullopt;
@@ -163,9 +163,9 @@
return chunk;
}
-void FCFSSendQueue::Discard(IsUnordered unordered,
- StreamID stream_id,
- MID message_id) {
+void RRSendQueue::Discard(IsUnordered unordered,
+ StreamID stream_id,
+ MID message_id) {
// As this method will only discard partially sent messages, and as the queue
// is a FIFO queue, the only partially sent message would be the topmost
// message.
@@ -179,8 +179,7 @@
}
}
-void FCFSSendQueue::PrepareResetStreams(
- rtc::ArrayView<const StreamID> streams) {
+void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {
for (StreamID stream_id : streams) {
paused_streams_.insert(stream_id);
}
@@ -197,7 +196,7 @@
}
}
-bool FCFSSendQueue::CanResetStreams() const {
+bool RRSendQueue::CanResetStreams() const {
for (auto& item : items_) {
if (IsPaused(item.message.stream_id())) {
return false;
@@ -206,7 +205,7 @@
return true;
}
-void FCFSSendQueue::CommitResetStreams() {
+void RRSendQueue::CommitResetStreams() {
for (StreamID stream_id : paused_streams_) {
ssn_by_stream_id_[stream_id] = SSN(0);
// https://tools.ietf.org/html/rfc8260#section-2.3.2
@@ -219,7 +218,7 @@
RollbackResetStreams();
}
-void FCFSSendQueue::RollbackResetStreams() {
+void RRSendQueue::RollbackResetStreams() {
while (!paused_items_.empty()) {
items_.push_back(std::move(paused_items_.front()));
paused_items_.pop_front();
@@ -227,7 +226,7 @@
paused_streams_.clear();
}
-void FCFSSendQueue::Reset() {
+void RRSendQueue::Reset() {
if (!items_.empty()) {
// If this message has been partially sent, reset it so that it will be
// re-sent.
@@ -243,7 +242,7 @@
ssn_by_stream_id_.clear();
}
-bool FCFSSendQueue::IsPaused(StreamID stream_id) const {
+bool RRSendQueue::IsPaused(StreamID stream_id) const {
return paused_streams_.find(stream_id) != paused_streams_.end();
}
diff --git a/net/dcsctp/tx/fcfs_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
similarity index 81%
rename from net/dcsctp/tx/fcfs_send_queue.h
rename to net/dcsctp/tx/rr_send_queue.h
index 63e7eab..c43dc91 100644
--- a/net/dcsctp/tx/fcfs_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -7,8 +7,8 @@
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
-#ifndef NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_
-#define NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_
+#ifndef NET_DCSCTP_TX_RR_SEND_QUEUE_H_
+#define NET_DCSCTP_TX_RR_SEND_QUEUE_H_
#include <cstdint>
#include <deque>
@@ -29,24 +29,23 @@
namespace dcsctp {
-// The FCFSSendQueue (First-Come, First-Served Send Queue) holds all messages
-// that the client wants to send, but that haven't yet been split into chunks
-// and sent on the wire.
+// The Round Robin SendQueue holds all messages that the client wants to send,
+// but that haven't yet been split into chunks and fully sent on the wire.
//
-// First-Come, First Served means that it passes the data in the exact same
-// order as they were delivered by the calling application, and is defined in
-// https://tools.ietf.org/html/rfc8260#section-3.1. It's a FIFO queue, but that
-// term isn't used in this RFC.
+// As defined in https://datatracker.ietf.org/doc/html/rfc8260#section-3.2,
+// it will cycle to send messages from different streams. It will send all
+// fragments from one message before continuing with a different message on
+// possibly a different stream, until support for message interleaving has been
+// implemented.
//
-// As messages can be (requested to be) sent before
-// the connection is properly established, this send queue is always present -
-// even for closed connections.
-class FCFSSendQueue : public SendQueue {
+// As messages can be (requested to be) sent before the connection is properly
+// established, this send queue is always present - even for closed connections.
+class RRSendQueue : public SendQueue {
public:
// How small a data chunk's payload may be, if having to fragment a message.
static constexpr size_t kMinimumFragmentedPayload = 10;
- FCFSSendQueue(absl::string_view log_prefix, size_t buffer_size)
+ RRSendQueue(absl::string_view log_prefix, size_t buffer_size)
: log_prefix_(std::string(log_prefix) + "fcfs: "),
buffer_size_(buffer_size) {}
@@ -120,4 +119,4 @@
};
} // namespace dcsctp
-#endif // NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_
+#endif // NET_DCSCTP_TX_RR_SEND_QUEUE_H_
diff --git a/net/dcsctp/tx/fcfs_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc
similarity index 90%
rename from net/dcsctp/tx/fcfs_send_queue_test.cc
rename to net/dcsctp/tx/rr_send_queue_test.cc
index a67a0a1..0f6fd2b 100644
--- a/net/dcsctp/tx/fcfs_send_queue_test.cc
+++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -7,7 +7,7 @@
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
-#include "net/dcsctp/tx/fcfs_send_queue.h"
+#include "net/dcsctp/tx/rr_send_queue.h"
#include <cstdint>
#include <type_traits>
@@ -29,21 +29,21 @@
constexpr StreamID kStreamID(1);
constexpr PPID kPPID(53);
-class FCFSSendQueueTest : public testing::Test {
+class RRSendQueueTest : public testing::Test {
protected:
- FCFSSendQueueTest() : buf_("log: ", 100) {}
+ RRSendQueueTest() : buf_("log: ", 100) {}
const DcSctpOptions options_;
- FCFSSendQueue buf_;
+ RRSendQueue buf_;
};
-TEST_F(FCFSSendQueueTest, EmptyBuffer) {
+TEST_F(RRSendQueueTest, EmptyBuffer) {
EXPECT_TRUE(buf_.IsEmpty());
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
EXPECT_FALSE(buf_.IsFull());
}
-TEST_F(FCFSSendQueueTest, AddAndGetSingleChunk) {
+TEST_F(RRSendQueueTest, AddAndGetSingleChunk) {
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
EXPECT_FALSE(buf_.IsEmpty());
@@ -54,7 +54,7 @@
EXPECT_TRUE(chunk_opt->data.is_end);
}
-TEST_F(FCFSSendQueueTest, CarveOutBeginningMiddleAndEnd) {
+TEST_F(RRSendQueueTest, CarveOutBeginningMiddleAndEnd) {
std::vector<uint8_t> payload(60);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
@@ -79,7 +79,7 @@
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
}
-TEST_F(FCFSSendQueueTest, GetChunksFromTwoMessages) {
+TEST_F(RRSendQueueTest, GetChunksFromTwoMessages) {
std::vector<uint8_t> payload(60);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
@@ -99,7 +99,7 @@
EXPECT_TRUE(chunk_two->data.is_end);
}
-TEST_F(FCFSSendQueueTest, BufferBecomesFullAndEmptied) {
+TEST_F(RRSendQueueTest, BufferBecomesFullAndEmptied) {
std::vector<uint8_t> payload(60);
EXPECT_FALSE(buf_.IsFull());
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
@@ -136,20 +136,20 @@
EXPECT_TRUE(buf_.IsEmpty());
}
-TEST_F(FCFSSendQueueTest, WillNotSendTooSmallPacket) {
- std::vector<uint8_t> payload(FCFSSendQueue::kMinimumFragmentedPayload + 1);
+TEST_F(RRSendQueueTest, WillNotSendTooSmallPacket) {
+ std::vector<uint8_t> payload(RRSendQueue::kMinimumFragmentedPayload + 1);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
// Wouldn't fit enough payload (wouldn't want to fragment)
EXPECT_FALSE(
buf_.Produce(kNow,
- /*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload - 1)
+ /*max_size=*/RRSendQueue::kMinimumFragmentedPayload - 1)
.has_value());
// Minimum fragment
absl::optional<SendQueue::DataToSend> chunk_one =
buf_.Produce(kNow,
- /*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload);
+ /*max_size=*/RRSendQueue::kMinimumFragmentedPayload);
ASSERT_TRUE(chunk_one.has_value());
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
EXPECT_EQ(chunk_one->data.ppid, kPPID);
@@ -165,7 +165,7 @@
EXPECT_TRUE(buf_.IsEmpty());
}
-TEST_F(FCFSSendQueueTest, DefaultsToOrderedSend) {
+TEST_F(RRSendQueueTest, DefaultsToOrderedSend) {
std::vector<uint8_t> payload(20);
// Default is ordered
@@ -185,7 +185,7 @@
EXPECT_TRUE(chunk_two->data.is_unordered);
}
-TEST_F(FCFSSendQueueTest, ProduceWithLifetimeExpiry) {
+TEST_F(RRSendQueueTest, ProduceWithLifetimeExpiry) {
std::vector<uint8_t> payload(20);
// Default is no expiry
@@ -225,7 +225,7 @@
ASSERT_FALSE(buf_.Produce(now, 100));
}
-TEST_F(FCFSSendQueueTest, DiscardPartialPackets) {
+TEST_F(RRSendQueueTest, DiscardPartialPackets) {
std::vector<uint8_t> payload(120);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
@@ -255,7 +255,7 @@
ASSERT_FALSE(buf_.Produce(kNow, 100));
}
-TEST_F(FCFSSendQueueTest, PrepareResetStreamsDiscardsStream) {
+TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) {
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3}));
buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5}));
EXPECT_EQ(buf_.total_bytes(), 8u);
@@ -267,7 +267,7 @@
EXPECT_EQ(buf_.total_bytes(), 0u);
}
-TEST_F(FCFSSendQueueTest, PrepareResetStreamsNotPartialPackets) {
+TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) {
std::vector<uint8_t> payload(120);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
@@ -283,7 +283,7 @@
EXPECT_EQ(buf_.total_bytes(), payload.size() - 50);
}
-TEST_F(FCFSSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
+TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
std::vector<uint8_t> payload(50);
buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
@@ -302,7 +302,7 @@
EXPECT_EQ(buf_.total_bytes(), 0u);
}
-TEST_F(FCFSSendQueueTest, CommittingResetsSSN) {
+TEST_F(RRSendQueueTest, CommittingResetsSSN) {
std::vector<uint8_t> payload(50);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
@@ -330,7 +330,7 @@
EXPECT_EQ(chunk_three->data.ssn, SSN(0));
}
-TEST_F(FCFSSendQueueTest, RollBackResumesSSN) {
+TEST_F(RRSendQueueTest, RollBackResumesSSN) {
std::vector<uint8_t> payload(50);
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));