Add support for dynamic processing mode in PacedSender.
Behind a default-disabled field trial.
Bug: webrtc:10809
Change-Id: If5d9b69721bd67e59e68b1026e3797e9a1b0a760
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/159783
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29802}
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index fad9018..56eed92 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -32,11 +32,16 @@
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
ProcessThread* process_thread)
- : pacing_controller_(clock,
+ : process_mode_((field_trials != nullptr &&
+ field_trials->Lookup("WebRTC-Pacer-DynamicProcess")
+ .find("Enabled") == 0)
+ ? PacingController::ProcessMode::kDynamic
+ : PacingController::ProcessMode::kPeriodic),
+ pacing_controller_(clock,
static_cast<PacingController::PacketSender*>(this),
event_log,
field_trials,
- PacingController::ProcessMode::kPeriodic),
+ process_mode_),
clock_(clock),
packet_router_(packet_router),
process_thread_(process_thread) {
@@ -45,8 +50,9 @@
}
PacedSender::~PacedSender() {
- if (process_thread_)
+ if (process_thread_) {
process_thread_->DeRegisterModule(&module_proxy_);
+ }
}
void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) {
@@ -62,8 +68,9 @@
// Tell the process thread to call our TimeUntilNextProcess() method to get
// a new (longer) estimate for when to call Process().
- if (process_thread_)
+ if (process_thread_) {
process_thread_->WakeUp(&module_proxy_);
+ }
}
void PacedSender::Resume() {
@@ -74,31 +81,44 @@
// Tell the process thread to call our TimeUntilNextProcess() method to
// refresh the estimate for when to call Process().
- if (process_thread_)
+ if (process_thread_) {
process_thread_->WakeUp(&module_proxy_);
+ }
}
void PacedSender::SetCongestionWindow(DataSize congestion_window_size) {
- rtc::CritScope cs(&critsect_);
- pacing_controller_.SetCongestionWindow(congestion_window_size);
+ {
+ rtc::CritScope cs(&critsect_);
+ pacing_controller_.SetCongestionWindow(congestion_window_size);
+ }
+ MaybeWakupProcessThread();
}
void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
- rtc::CritScope cs(&critsect_);
- pacing_controller_.UpdateOutstandingData(outstanding_data);
+ {
+ rtc::CritScope cs(&critsect_);
+ pacing_controller_.UpdateOutstandingData(outstanding_data);
+ }
+ MaybeWakupProcessThread();
}
void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) {
- rtc::CritScope cs(&critsect_);
- pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
+ {
+ rtc::CritScope cs(&critsect_);
+ pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
+ }
+ MaybeWakupProcessThread();
}
void PacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
- rtc::CritScope cs(&critsect_);
- for (auto& packet : packets) {
- pacing_controller_.EnqueuePacket(std::move(packet));
+ {
+ rtc::CritScope cs(&critsect_);
+ for (auto& packet : packets) {
+ pacing_controller_.EnqueuePacket(std::move(packet));
+ }
}
+ MaybeWakupProcessThread();
}
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
@@ -144,9 +164,21 @@
RTC_DCHECK(!process_thread || process_thread == process_thread_);
}
+void PacedSender::MaybeWakupProcessThread() {
+ // Tell the process thread to call our TimeUntilNextProcess() method to get
+ // a new time for when to call Process().
+ if (process_thread_ &&
+ process_mode_ == PacingController::ProcessMode::kDynamic) {
+ process_thread_->WakeUp(&module_proxy_);
+ }
+}
+
void PacedSender::SetQueueTimeLimit(TimeDelta limit) {
- rtc::CritScope cs(&critsect_);
- pacing_controller_.SetQueueTimeLimit(limit);
+ {
+ rtc::CritScope cs(&critsect_);
+ pacing_controller_.SetQueueTimeLimit(limit);
+ }
+ MaybeWakupProcessThread();
}
void PacedSender::SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h
index 3539c53..06a6c26 100644
--- a/modules/pacing/paced_sender.h
+++ b/modules/pacing/paced_sender.h
@@ -134,9 +134,10 @@
// Called when the prober is associated with a process thread.
void ProcessThreadAttached(ProcessThread* process_thread) override;
- private:
- // Methods implementing PacedSenderController:PacketSender.
+ // In dynamic process mode, refreshes the next process time.
+ void MaybeWakupProcessThread();
+ // Methods implementing PacedSenderController:PacketSender.
void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) override
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
@@ -163,6 +164,7 @@
} module_proxy_{this};
rtc::CriticalSection critsect_;
+ const PacingController::ProcessMode process_mode_;
PacingController pacing_controller_ RTC_GUARDED_BY(critsect_);
Clock* const clock_;
diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc
index feb6c07..23f1d60 100644
--- a/modules/pacing/paced_sender_unittest.cc
+++ b/modules/pacing/paced_sender_unittest.cc
@@ -39,7 +39,6 @@
namespace webrtc {
namespace test {
-
// Mock callback implementing the raw api.
class MockCallback : public PacketRouter {
public:
@@ -51,69 +50,88 @@
std::vector<std::unique_ptr<RtpPacketToSend>>(size_t target_size_bytes));
};
-std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
- auto packet = std::make_unique<RtpPacketToSend>(nullptr);
- packet->set_packet_type(type);
- switch (type) {
- case RtpPacketToSend::Type::kAudio:
- packet->SetSsrc(kAudioSsrc);
- break;
- case RtpPacketToSend::Type::kVideo:
- packet->SetSsrc(kVideoSsrc);
- break;
- case RtpPacketToSend::Type::kRetransmission:
- case RtpPacketToSend::Type::kPadding:
- packet->SetSsrc(kVideoRtxSsrc);
- break;
- case RtpPacketToSend::Type::kForwardErrorCorrection:
- packet->SetSsrc(kFlexFecSsrc);
- break;
+class PacedSenderTest
+ : public ::testing::TestWithParam<PacingController::ProcessMode> {
+ public:
+ PacedSenderTest() : clock_(0), paced_module_(nullptr) {}
+
+ void SetUp() override {
+ EXPECT_CALL(process_thread_, RegisterModule)
+ .WillOnce(SaveArg<0>(&paced_module_));
+
+ pacer_ = std::make_unique<PacedSender>(&clock_, &callback_, nullptr,
+ nullptr, &process_thread_);
+ EXPECT_CALL(process_thread_, DeRegisterModule(paced_module_)).Times(1);
}
- packet->SetPayloadSize(kDefaultPacketSize);
- return packet;
-}
+ protected:
+ std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
+ auto packet = std::make_unique<RtpPacketToSend>(nullptr);
+ packet->set_packet_type(type);
+ switch (type) {
+ case RtpPacketToSend::Type::kAudio:
+ packet->SetSsrc(kAudioSsrc);
+ break;
+ case RtpPacketToSend::Type::kVideo:
+ packet->SetSsrc(kVideoSsrc);
+ break;
+ case RtpPacketToSend::Type::kRetransmission:
+ case RtpPacketToSend::Type::kPadding:
+ packet->SetSsrc(kVideoRtxSsrc);
+ break;
+ case RtpPacketToSend::Type::kForwardErrorCorrection:
+ packet->SetSsrc(kFlexFecSsrc);
+ break;
+ }
-TEST(PacedSenderTest, PacesPackets) {
- SimulatedClock clock(0);
- MockCallback callback;
- MockProcessThread process_thread;
- Module* paced_module = nullptr;
- EXPECT_CALL(process_thread, RegisterModule(_, _))
- .WillOnce(SaveArg<0>(&paced_module));
- PacedSender pacer(&clock, &callback, nullptr, nullptr, &process_thread);
- EXPECT_CALL(process_thread, DeRegisterModule(paced_module)).Times(1);
+ packet->SetPayloadSize(kDefaultPacketSize);
+ return packet;
+ }
+ SimulatedClock clock_;
+ MockCallback callback_;
+ MockProcessThread process_thread_;
+ Module* paced_module_;
+ std::unique_ptr<PacedSender> pacer_;
+};
+
+TEST_P(PacedSenderTest, PacesPackets) {
// Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42;
- pacer.SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend),
- DataRate::Zero());
+ pacer_->SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend),
+ DataRate::Zero());
std::vector<std::unique_ptr<RtpPacketToSend>> packets;
for (size_t i = 0; i < kPacketsToSend; ++i) {
packets.emplace_back(BuildRtpPacket(RtpPacketToSend::Type::kVideo));
}
- pacer.EnqueuePackets(std::move(packets));
+ pacer_->EnqueuePackets(std::move(packets));
// Expect all of them to be sent.
size_t packets_sent = 0;
- clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess());
- EXPECT_CALL(callback, SendPacket)
+ clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess());
+ EXPECT_CALL(callback_, SendPacket)
.WillRepeatedly(
[&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) { ++packets_sent; });
- const Timestamp start_time = clock.CurrentTime();
+ const Timestamp start_time = clock_.CurrentTime();
while (packets_sent < kPacketsToSend) {
- clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess());
- paced_module->Process();
+ clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess());
+ paced_module_->Process();
}
// Packets should be sent over a period of close to 1s. Expect a little lower
// than this since initial probing is a bit quicker.
- TimeDelta duration = clock.CurrentTime() - start_time;
+ TimeDelta duration = clock_.CurrentTime() - start_time;
EXPECT_GT(duration, TimeDelta::ms(900));
}
+INSTANTIATE_TEST_SUITE_P(
+ WithAndWithoutDynamicProcess,
+ PacedSenderTest,
+ ::testing::Values(PacingController::ProcessMode::kPeriodic,
+ PacingController::ProcessMode::kDynamic));
+
} // namespace test
} // namespace webrtc