Move ownership of congestion window state to rtp sender controller.
When congestion window is used, two different mechanisms can currently
update the outstanding data state in the pacer:
* OnPacketSent() withing the pacer itself, when a packet is sent
* UpdateOutstandingData(), when RtpTransportControllerSend either:
a. Receives an OnPacketSent() callback (increase outstanding data)
b. Receives transport feedback (decrease outstanding data)
This creates a lot of calls to UpdateOutstandingData(), more than one
per sent packet. Each requires locking and/or thread jumps. To avoid
that, this CL moves the congestion window state to
RtpTransportController send - and we only post a congested flag down
the the pacer when the state is changed.
The only benefit I can see is of the old way is we prevent sending
new packets immedately when the window is full, rather than in some
edge cases queue extra packets on the network task queue before the
congestion signal is received. That should be rare and benign.
I think this simplified logic, which is easier to read and more
performant, is a better tradeoff.
Bug: webrtc:13417
Change-Id: I326dd88db86dc0d6dc685c61920654ac024e57ef
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/255600
Auto-Submit: Erik Språng <sprang@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Commit-Queue: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36220}
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index 5eed6c2..9b419f0 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -87,18 +87,10 @@
}
}
-void PacedSender::SetCongestionWindow(DataSize congestion_window_size) {
+void PacedSender::SetCongested(bool congested) {
{
MutexLock lock(&mutex_);
- pacing_controller_.SetCongestionWindow(congestion_window_size);
- }
- MaybeWakupProcessThread();
-}
-
-void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
- {
- MutexLock lock(&mutex_);
- pacing_controller_.UpdateOutstandingData(outstanding_data);
+ pacing_controller_.SetCongested(congested);
}
MaybeWakupProcessThread();
}
diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h
index 88fd796..bf82e78 100644
--- a/modules/pacing/paced_sender.h
+++ b/modules/pacing/paced_sender.h
@@ -80,8 +80,7 @@
// Resume sending packets.
void Resume() override;
- void SetCongestionWindow(DataSize congestion_window_size) override;
- void UpdateOutstandingData(DataSize outstanding_data) override;
+ void SetCongested(bool congested) override;
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index 9bfe85c..c9628e0 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -129,8 +129,7 @@
last_send_time_(last_process_time_),
packet_queue_(last_process_time_),
packet_counter_(0),
- congestion_window_size_(DataSize::PlusInfinity()),
- outstanding_data_(DataSize::Zero()),
+ congested_(false),
queue_time_limit(kMaxExpectedQueueLength),
account_for_audio_(false),
include_overhead_(false) {
@@ -169,29 +168,11 @@
return paused_;
}
-void PacingController::SetCongestionWindow(DataSize congestion_window_size) {
- const bool was_congested = Congested();
- congestion_window_size_ = congestion_window_size;
- if (was_congested && !Congested()) {
- TimeDelta elapsed_time = UpdateTimeAndGetElapsed(CurrentTime());
- UpdateBudgetWithElapsedTime(elapsed_time);
+void PacingController::SetCongested(bool congested) {
+ if (congested_ && !congested) {
+ UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(CurrentTime()));
}
-}
-
-void PacingController::UpdateOutstandingData(DataSize outstanding_data) {
- const bool was_congested = Congested();
- outstanding_data_ = outstanding_data;
- if (was_congested && !Congested()) {
- TimeDelta elapsed_time = UpdateTimeAndGetElapsed(CurrentTime());
- UpdateBudgetWithElapsedTime(elapsed_time);
- }
-}
-
-bool PacingController::Congested() const {
- if (congestion_window_size_.IsFinite()) {
- return outstanding_data_ >= congestion_window_size_;
- }
- return false;
+ congested_ = congested;
}
bool PacingController::IsProbing() const {
@@ -327,7 +308,7 @@
}
bool PacingController::ShouldSendKeepalive(Timestamp now) const {
- if (send_padding_if_silent_ || paused_ || Congested() ||
+ if (send_padding_if_silent_ || paused_ || congested_ ||
packet_counter_ == 0) {
// We send a padding packet every 500 ms to ensure we won't get stuck in
// congested state due to no feedback being received.
@@ -373,7 +354,7 @@
}
}
- if (Congested() || packet_counter_ == 0) {
+ if (congested_ || packet_counter_ == 0) {
// We need to at least send keep-alive packets with some interval.
return last_send_time_ + kCongestedPacketInterval;
}
@@ -623,7 +604,7 @@
return DataSize::Zero();
}
- if (Congested()) {
+ if (congested_) {
// Don't add padding if congested, even if requested for probing.
return DataSize::Zero();
}
@@ -665,7 +646,7 @@
!pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value();
bool is_probe = pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe;
if (!unpaced_audio_packet && !is_probe) {
- if (Congested()) {
+ if (congested_) {
// Don't send anything if congested.
return nullptr;
}
@@ -728,7 +709,6 @@
}
void PacingController::UpdateBudgetWithSentData(DataSize size) {
- outstanding_data_ += size;
if (mode_ == ProcessMode::kPeriodic) {
media_budget_.UseBudget(size.bytes());
padding_budget_.UseBudget(size.bytes());
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index f7c5601..d0c2e73 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -97,8 +97,7 @@
void Resume(); // Resume sending packets.
bool IsPaused() const;
- void SetCongestionWindow(DataSize congestion_window_size);
- void UpdateOutstandingData(DataSize outstanding_data);
+ void SetCongested(bool congested);
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate);
@@ -145,8 +144,6 @@
// is available.
void ProcessPackets();
- bool Congested() const;
-
bool IsProbing() const;
private:
@@ -225,8 +222,7 @@
RoundRobinPacketQueue packet_queue_;
uint64_t packet_counter_;
- DataSize congestion_window_size_;
- DataSize outstanding_data_;
+ bool congested_;
TimeDelta queue_time_limit;
bool account_for_audio_;
diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc
index af2ce54..c3ab266 100644
--- a/modules/pacing/pacing_controller_unittest.cc
+++ b/modules/pacing/pacing_controller_unittest.cc
@@ -399,12 +399,11 @@
EXPECT_CALL(callback_, SendPadding).Times(0);
PacingController pacer(&clock_, &callback_, nullptr, trials, GetParam());
pacer.SetPacingRates(DataRate::KilobitsPerSec(10000), DataRate::Zero());
- pacer.SetCongestionWindow(DataSize::Bytes(video.packet_size - 100));
- pacer.UpdateOutstandingData(DataSize::Zero());
// Video packet fills congestion window.
InsertPacket(&pacer, &video);
EXPECT_CALL(callback_, SendPacket).Times(1);
ProcessNext(&pacer);
+ pacer.SetCongested(true);
// Audio packet blocked due to congestion.
InsertPacket(&pacer, &audio);
EXPECT_CALL(callback_, SendPacket).Times(0);
@@ -416,7 +415,7 @@
ProcessNext(&pacer);
// Audio packet unblocked when congestion window clear.
::testing::Mock::VerifyAndClearExpectations(&callback_);
- pacer.UpdateOutstandingData(DataSize::Zero());
+ pacer.SetCongested(false);
EXPECT_CALL(callback_, SendPacket).Times(1);
ProcessNext(&pacer);
}
@@ -427,12 +426,11 @@
const test::ExplicitKeyValueConfig trials("");
PacingController pacer(&clock_, &callback_, nullptr, trials, GetParam());
pacer.SetPacingRates(DataRate::BitsPerSec(10000000), DataRate::Zero());
- pacer.SetCongestionWindow(DataSize::Bytes(800));
- pacer.UpdateOutstandingData(DataSize::Zero());
// Video packet fills congestion window.
InsertPacket(&pacer, &video);
EXPECT_CALL(callback_, SendPacket).Times(1);
ProcessNext(&pacer);
+ pacer.SetCongested(true);
// Audio not blocked due to congestion.
InsertPacket(&pacer, &audio);
EXPECT_CALL(callback_, SendPacket).Times(1);
@@ -1062,21 +1060,18 @@
uint32_t ssrc = 202020;
uint16_t sequence_number = 1000;
int kPacketSize = 250;
- int kCongestionWindow = kPacketSize * 10;
- pacer_->UpdateOutstandingData(DataSize::Zero());
- pacer_->SetCongestionWindow(DataSize::Bytes(kCongestionWindow));
- int sent_data = 0;
- while (sent_data < kCongestionWindow) {
- sent_data += kPacketSize;
- SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++,
- clock_.TimeInMilliseconds(), kPacketSize);
- AdvanceTimeAndProcess();
- }
+ // Send an initial packet so we have a last send time.
+ SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ AdvanceTimeAndProcess();
::testing::Mock::VerifyAndClearExpectations(&callback_);
+
+ // Set congested state, we should not send anything until the 500ms since
+ // last send time limit for keep-alives is triggered.
EXPECT_CALL(callback_, SendPacket).Times(0);
EXPECT_CALL(callback_, SendPadding).Times(0);
-
+ pacer_->SetCongested(true);
size_t blocked_packets = 0;
int64_t expected_time_until_padding = 500;
while (expected_time_until_padding > 5) {
@@ -1087,6 +1082,7 @@
pacer_->ProcessPackets();
expected_time_until_padding -= 5;
}
+
::testing::Mock::VerifyAndClearExpectations(&callback_);
EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1));
EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1);
@@ -1105,15 +1101,13 @@
// to be sent in a row.
pacer_->SetPacingRates(DataRate::BitsPerSec(400 * 8 * 1000 / 5),
DataRate::Zero());
- // The congestion window is small enough to only let one packet through.
- pacer_->SetCongestionWindow(DataSize::Bytes(800));
- pacer_->UpdateOutstandingData(DataSize::Zero());
// Not yet budget limited or congested, packet is sent.
Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size);
EXPECT_CALL(callback_, SendPacket).Times(1);
clock_.AdvanceTimeMilliseconds(5);
pacer_->ProcessPackets();
// Packet blocked due to congestion.
+ pacer_->SetCongested(true);
Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size);
EXPECT_CALL(callback_, SendPacket).Times(0);
clock_.AdvanceTimeMilliseconds(5);
@@ -1127,7 +1121,7 @@
Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size);
EXPECT_CALL(callback_, SendPacket).Times(1);
clock_.AdvanceTimeMilliseconds(5);
- pacer_->UpdateOutstandingData(DataSize::Zero());
+ pacer_->SetCongested(false);
pacer_->ProcessPackets();
// Should be blocked due to budget limitation as congestion has be removed.
Send(RtpPacketMediaType::kVideo, ssrc, seq_num++, now_ms(), size);
@@ -1136,61 +1130,6 @@
pacer_->ProcessPackets();
}
-TEST_P(PacingControllerTest, ResumesSendingWhenCongestionEnds) {
- uint32_t ssrc = 202020;
- uint16_t sequence_number = 1000;
- int64_t kPacketSize = 250;
- int64_t kCongestionCount = 10;
- int64_t kCongestionWindow = kPacketSize * kCongestionCount;
- int64_t kCongestionTimeMs = 1000;
-
- pacer_->UpdateOutstandingData(DataSize::Zero());
- pacer_->SetCongestionWindow(DataSize::Bytes(kCongestionWindow));
- int sent_data = 0;
- while (sent_data < kCongestionWindow) {
- sent_data += kPacketSize;
- SendAndExpectPacket(RtpPacketMediaType::kVideo, ssrc, sequence_number++,
- clock_.TimeInMilliseconds(), kPacketSize);
- clock_.AdvanceTimeMilliseconds(5);
- pacer_->ProcessPackets();
- }
- ::testing::Mock::VerifyAndClearExpectations(&callback_);
- EXPECT_CALL(callback_, SendPacket).Times(0);
- int unacked_packets = 0;
- for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
- Send(RtpPacketMediaType::kVideo, ssrc, sequence_number++,
- clock_.TimeInMilliseconds(), kPacketSize);
- unacked_packets++;
- clock_.AdvanceTimeMilliseconds(5);
- pacer_->ProcessPackets();
- }
- ::testing::Mock::VerifyAndClearExpectations(&callback_);
-
- // First mark half of the congested packets as cleared and make sure that just
- // as many are sent
- int ack_count = kCongestionCount / 2;
- EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)).Times(ack_count);
- pacer_->UpdateOutstandingData(
- DataSize::Bytes(kCongestionWindow - kPacketSize * ack_count));
-
- for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
- clock_.AdvanceTimeMilliseconds(5);
- pacer_->ProcessPackets();
- }
- unacked_packets -= ack_count;
- ::testing::Mock::VerifyAndClearExpectations(&callback_);
-
- // Second make sure all packets are sent if sent packets are continuously
- // marked as acked.
- EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _))
- .Times(unacked_packets);
- for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
- pacer_->UpdateOutstandingData(DataSize::Zero());
- clock_.AdvanceTimeMilliseconds(5);
- pacer_->ProcessPackets();
- }
-}
-
TEST_P(PacingControllerTest, Pause) {
uint32_t ssrc_low_priority = 12345;
uint32_t ssrc = 12346;
diff --git a/modules/pacing/rtp_packet_pacer.h b/modules/pacing/rtp_packet_pacer.h
index 3dc2b27..a201838 100644
--- a/modules/pacing/rtp_packet_pacer.h
+++ b/modules/pacing/rtp_packet_pacer.h
@@ -34,8 +34,7 @@
// Resume sending packets.
virtual void Resume() = 0;
- virtual void SetCongestionWindow(DataSize congestion_window_size) = 0;
- virtual void UpdateOutstandingData(DataSize outstanding_data) = 0;
+ virtual void SetCongested(bool congested) = 0;
// Sets the pacing rates. Must be called once before packets can be sent.
virtual void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) = 0;
diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc
index 620a541..c2b376c 100644
--- a/modules/pacing/task_queue_paced_sender.cc
+++ b/modules/pacing/task_queue_paced_sender.cc
@@ -105,28 +105,10 @@
});
}
-void TaskQueuePacedSender::SetCongestionWindow(
- DataSize congestion_window_size) {
- task_queue_.PostTask([this, congestion_window_size]() {
+void TaskQueuePacedSender::SetCongested(bool congested) {
+ task_queue_.PostTask([this, congested]() {
RTC_DCHECK_RUN_ON(&task_queue_);
- pacing_controller_.SetCongestionWindow(congestion_window_size);
- MaybeProcessPackets(Timestamp::MinusInfinity());
- });
-}
-
-void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) {
- if (task_queue_.IsCurrent()) {
- RTC_DCHECK_RUN_ON(&task_queue_);
- // Fast path since this can be called once per sent packet while on the
- // task queue.
- pacing_controller_.UpdateOutstandingData(outstanding_data);
- MaybeProcessPackets(Timestamp::MinusInfinity());
- return;
- }
-
- task_queue_.PostTask([this, outstanding_data]() {
- RTC_DCHECK_RUN_ON(&task_queue_);
- pacing_controller_.UpdateOutstandingData(outstanding_data);
+ pacing_controller_.SetCongested(congested);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h
index 61a6255..33d7b5e 100644
--- a/modules/pacing/task_queue_paced_sender.h
+++ b/modules/pacing/task_queue_paced_sender.h
@@ -86,8 +86,7 @@
// Resume sending packets.
void Resume() override;
- void SetCongestionWindow(DataSize congestion_window_size) override;
- void UpdateOutstandingData(DataSize outstanding_data) override;
+ void SetCongested(bool congested) override;
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;