Add ability to control TaskQueuePacedSender holdback window.
Holdback window can be specified as absolute time and in terms of packet
send times. Example:
WebRTC-TaskQueuePacer/Enabled,holdback_window:20ms,holdback_packet:3/
If current conditions have us running with 2000kbps pacing rate and
1250byte (10kbit) packets, each packet send time is 5ms.
The holdback window would then be min(20ms, 3*5ms) = 15ms.
The default is like before 1ms and packets no take into account when
TQ pacer is used, parameters have no effect with legacy process thread
pacer.
Bug: webrtc:10809
Change-Id: I800de05107e2d4df461eabaaf1ca04fb4c5de51e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/233421
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35266}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index 9d27b33..8825df2 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -75,6 +75,15 @@
} // namespace
+RtpTransportControllerSend::PacerSettings::PacerSettings(
+ const WebRtcKeyValueConfig* trials)
+ : tq_disabled("Disabled"),
+ holdback_window("holdback_window", PacingController::kMinSleepTime),
+ holdback_packets("holdback_packets", -1) {
+ ParseFieldTrial({&tq_disabled, &holdback_window, &holdback_packets},
+ trials->Lookup("WebRTC-TaskQueuePacer"));
+}
+
RtpTransportControllerSend::RtpTransportControllerSend(
Clock* clock,
webrtc::RtcEventLog* event_log,
@@ -89,8 +98,8 @@
bitrate_configurator_(bitrate_config),
pacer_started_(false),
process_thread_(std::move(process_thread)),
- use_task_queue_pacer_(!IsDisabled(trials, "WebRTC-TaskQueuePacer")),
- process_thread_pacer_(use_task_queue_pacer_
+ pacer_settings_(trials),
+ process_thread_pacer_(pacer_settings_.use_task_queue_pacer()
? nullptr
: new PacedSender(clock,
&packet_router_,
@@ -98,14 +107,14 @@
trials,
process_thread_.get())),
task_queue_pacer_(
- use_task_queue_pacer_
- ? new TaskQueuePacedSender(
- clock,
- &packet_router_,
- event_log,
- trials,
- task_queue_factory,
- /*hold_back_window = */ PacingController::kMinSleepTime)
+ pacer_settings_.use_task_queue_pacer()
+ ? new TaskQueuePacedSender(clock,
+ &packet_router_,
+ event_log,
+ trials,
+ task_queue_factory,
+ pacer_settings_.holdback_window.Get(),
+ pacer_settings_.holdback_packets.Get())
: nullptr),
observer_(nullptr),
controller_factory_override_(controller_factory),
@@ -194,14 +203,14 @@
}
RtpPacketPacer* RtpTransportControllerSend::pacer() {
- if (use_task_queue_pacer_) {
+ if (pacer_settings_.use_task_queue_pacer()) {
return task_queue_pacer_.get();
}
return process_thread_pacer_.get();
}
const RtpPacketPacer* RtpTransportControllerSend::pacer() const {
- if (use_task_queue_pacer_) {
+ if (pacer_settings_.use_task_queue_pacer()) {
return task_queue_pacer_.get();
}
return process_thread_pacer_.get();
@@ -226,7 +235,7 @@
}
RtpPacketSender* RtpTransportControllerSend::packet_sender() {
- if (use_task_queue_pacer_) {
+ if (pacer_settings_.use_task_queue_pacer()) {
return task_queue_pacer_.get();
}
return process_thread_pacer_.get();
@@ -503,7 +512,7 @@
void RtpTransportControllerSend::EnsureStarted() {
if (!pacer_started_) {
pacer_started_ = true;
- if (use_task_queue_pacer_) {
+ if (pacer_settings_.use_task_queue_pacer()) {
task_queue_pacer_->EnsureStarted();
} else {
process_thread_->Start();
diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h
index f1b90c7..ac4213d 100644
--- a/call/rtp_transport_controller_send.h
+++ b/call/rtp_transport_controller_send.h
@@ -128,6 +128,16 @@
void OnRemoteNetworkEstimate(NetworkStateEstimate estimate) override;
private:
+ struct PacerSettings {
+ explicit PacerSettings(const WebRtcKeyValueConfig* trials);
+
+ bool use_task_queue_pacer() const { return !tq_disabled.Get(); }
+
+ FieldTrialFlag tq_disabled; // Kill-switch not normally used.
+ FieldTrialParameter<TimeDelta> holdback_window;
+ FieldTrialParameter<int> holdback_packets;
+ };
+
void MaybeCreateControllers() RTC_RUN_ON(task_queue_);
void UpdateInitialConstraints(TargetRateConstraints new_contraints)
RTC_RUN_ON(task_queue_);
@@ -158,7 +168,7 @@
std::map<std::string, rtc::NetworkRoute> network_routes_;
bool pacer_started_;
const std::unique_ptr<ProcessThread> process_thread_;
- const bool use_task_queue_pacer_;
+ const PacerSettings pacer_settings_;
std::unique_ptr<PacedSender> process_thread_pacer_;
std::unique_ptr<TaskQueuePacedSender> task_queue_pacer_;
diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index 0787105..0653356 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -48,6 +48,7 @@
"../../logging:rtc_event_pacing",
"../../rtc_base:checks",
"../../rtc_base:rtc_base_approved",
+ "../../rtc_base:rtc_numerics",
"../../rtc_base:rtc_task_queue",
"../../rtc_base/experiments:field_trial_parser",
"../../rtc_base/synchronization:mutex",
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index 38bb9e5..aade932 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -103,6 +103,7 @@
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate);
+ DataRate pacing_rate() const { return pacing_bitrate_; }
// Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for
diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc
index 515cba3..f2de9ec 100644
--- a/modules/pacing/task_queue_paced_sender.cc
+++ b/modules/pacing/task_queue_paced_sender.cc
@@ -36,9 +36,11 @@
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory,
- TimeDelta hold_back_window)
+ TimeDelta max_hold_back_window,
+ int max_hold_back_window_in_packets)
: clock_(clock),
- hold_back_window_(hold_back_window),
+ max_hold_back_window_(max_hold_back_window),
+ max_hold_back_window_in_packets_(max_hold_back_window_in_packets),
pacing_controller_(clock,
packet_sender,
event_log,
@@ -48,9 +50,12 @@
stats_update_scheduled_(false),
last_stats_time_(Timestamp::MinusInfinity()),
is_shutdown_(false),
+ packet_size_(/*alpha=*/0.95),
task_queue_(task_queue_factory->CreateTaskQueue(
"TaskQueuePacedSender",
- TaskQueueFactory::Priority::NORMAL)) {}
+ TaskQueueFactory::Priority::NORMAL)) {
+ packet_size_.Apply(1, 0);
+}
TaskQueuePacedSender::~TaskQueuePacedSender() {
// Post an immediate task to mark the queue as shutting down.
@@ -144,6 +149,7 @@
task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_);
for (auto& packet : packets_) {
+ packet_size_.Apply(1, packet->size());
RTC_DCHECK_GE(packet->capture_time_ms(), 0);
pacing_controller_.EnqueuePacket(std::move(packet));
}
@@ -227,6 +233,17 @@
next_process_time = pacing_controller_.NextSendTime();
}
+ TimeDelta hold_back_window = max_hold_back_window_;
+ DataRate pacing_rate = pacing_controller_.pacing_rate();
+ DataSize avg_packet_size = DataSize::Bytes(packet_size_.filtered());
+ if (max_hold_back_window_in_packets_ > 0 && !pacing_rate.IsZero() &&
+ !avg_packet_size.IsZero()) {
+ TimeDelta avg_packet_send_time = avg_packet_size / pacing_rate;
+ hold_back_window =
+ std::min(hold_back_window,
+ avg_packet_send_time * max_hold_back_window_in_packets_);
+ }
+
absl::optional<TimeDelta> time_to_next_process;
if (pacing_controller_.IsProbing() &&
next_process_time != next_process_time_) {
@@ -241,11 +258,11 @@
(next_process_time - now).RoundDownTo(TimeDelta::Millis(1)));
}
} else if (next_process_time_.IsMinusInfinity() ||
- next_process_time <= next_process_time_ - hold_back_window_) {
+ next_process_time <= next_process_time_ - hold_back_window) {
// Schedule a new task since there is none currently scheduled
// (`next_process_time_` is infinite), or the new process time is at least
// one holdback window earlier than whatever is currently scheduled.
- time_to_next_process = std::max(next_process_time - now, hold_back_window_);
+ time_to_next_process = std::max(next_process_time - now, hold_back_window);
}
if (time_to_next_process) {
diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h
index d39417b..ebe9846 100644
--- a/modules/pacing/task_queue_paced_sender.h
+++ b/modules/pacing/task_queue_paced_sender.h
@@ -29,6 +29,7 @@
#include "modules/pacing/pacing_controller.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
+#include "rtc_base/numerics/exp_filter.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_annotations.h"
@@ -43,14 +44,15 @@
// there is currently a pacer queue and packets can't immediately be
// processed. Increasing this reduces thread wakeups at the expense of higher
// latency.
- // TODO(bugs.webrtc.org/10809): Remove default value for hold_back_window.
+ // TODO(bugs.webrtc.org/10809): Remove default values.
TaskQueuePacedSender(
Clock* clock,
PacingController::PacketSender* packet_sender,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory,
- TimeDelta hold_back_window = PacingController::kMinSleepTime);
+ TimeDelta max_hold_back_window = PacingController::kMinSleepTime,
+ int max_hold_back_window_in_packets = -1);
~TaskQueuePacedSender() override;
@@ -132,7 +134,9 @@
Stats GetStats() const;
Clock* const clock_;
- const TimeDelta hold_back_window_;
+ const TimeDelta max_hold_back_window_;
+ const int max_hold_back_window_in_packets_;
+
PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_);
// We want only one (valid) delayed process task in flight at a time.
@@ -161,6 +165,9 @@
// never drain.
bool is_shutdown_ RTC_GUARDED_BY(task_queue_);
+ // Filtered size of enqueued packets, in bytes.
+ rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_);
+
mutable Mutex stats_mutex_;
Stats current_stats_ RTC_GUARDED_BY(stats_mutex_);
diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc
index aca1ba0..b921331 100644
--- a/modules/pacing/task_queue_paced_sender_unittest.cc
+++ b/modules/pacing/task_queue_paced_sender_unittest.cc
@@ -37,6 +37,7 @@
constexpr uint32_t kVideoRtxSsrc = 34567;
constexpr uint32_t kFlexFecSsrc = 45678;
constexpr size_t kDefaultPacketSize = 1234;
+constexpr int kNoPacketHoldback = -1;
class MockPacketRouter : public PacketRouter {
public:
@@ -70,13 +71,15 @@
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory,
- TimeDelta hold_back_window)
+ TimeDelta hold_back_window,
+ int max_hold_back_window_in_packets)
: TaskQueuePacedSender(clock,
packet_router,
event_log,
field_trials,
task_queue_factory,
- hold_back_window) {}
+ hold_back_window,
+ max_hold_back_window_in_packets) {}
void OnStatsUpdated(const Stats& stats) override {
++num_stats_updates_;
@@ -110,484 +113,579 @@
namespace test {
- std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) {
- auto packet = std::make_unique<RtpPacketToSend>(nullptr);
- packet->set_packet_type(type);
- switch (type) {
- case RtpPacketMediaType::kAudio:
- packet->SetSsrc(kAudioSsrc);
- break;
- case RtpPacketMediaType::kVideo:
- packet->SetSsrc(kVideoSsrc);
- break;
- case RtpPacketMediaType::kRetransmission:
- case RtpPacketMediaType::kPadding:
- packet->SetSsrc(kVideoRtxSsrc);
- break;
- case RtpPacketMediaType::kForwardErrorCorrection:
- packet->SetSsrc(kFlexFecSsrc);
- break;
- }
-
- packet->SetPayloadSize(kDefaultPacketSize);
- return packet;
+std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) {
+ auto packet = std::make_unique<RtpPacketToSend>(nullptr);
+ packet->set_packet_type(type);
+ switch (type) {
+ case RtpPacketMediaType::kAudio:
+ packet->SetSsrc(kAudioSsrc);
+ break;
+ case RtpPacketMediaType::kVideo:
+ packet->SetSsrc(kVideoSsrc);
+ break;
+ case RtpPacketMediaType::kRetransmission:
+ case RtpPacketMediaType::kPadding:
+ packet->SetSsrc(kVideoRtxSsrc);
+ break;
+ case RtpPacketMediaType::kForwardErrorCorrection:
+ packet->SetSsrc(kFlexFecSsrc);
+ break;
}
- std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
- RtpPacketMediaType type,
- size_t num_packets) {
- std::vector<std::unique_ptr<RtpPacketToSend>> packets;
- for (size_t i = 0; i < num_packets; ++i) {
- packets.push_back(BuildRtpPacket(type));
- }
- return packets;
+ packet->SetPayloadSize(kDefaultPacketSize);
+ return packet;
+}
+
+std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
+ RtpPacketMediaType type,
+ size_t num_packets) {
+ std::vector<std::unique_ptr<RtpPacketToSend>> packets;
+ for (size_t i = 0; i < num_packets; ++i) {
+ packets.push_back(BuildRtpPacket(type));
}
+ return packets;
+}
- TEST(TaskQueuePacedSenderTest, PacesPackets) {
- GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
- MockPacketRouter packet_router;
- TaskQueuePacedSenderForTest pacer(
- time_controller.GetClock(), &packet_router,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
- PacingController::kMinSleepTime);
+TEST(TaskQueuePacedSenderTest, PacesPackets) {
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(
+ time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+ PacingController::kMinSleepTime, kNoPacketHoldback);
- // Insert a number of packets, covering one second.
- static constexpr size_t kPacketsToSend = 42;
- pacer.SetPacingRates(
- DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
- DataRate::Zero());
- pacer.EnsureStarted();
- pacer.EnqueuePackets(
- GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
+ // Insert a number of packets, covering one second.
+ static constexpr size_t kPacketsToSend = 42;
+ pacer.SetPacingRates(
+ DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
+ DataRate::Zero());
+ pacer.EnsureStarted();
+ pacer.EnqueuePackets(
+ GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
- // Expect all of them to be sent.
- size_t packets_sent = 0;
- Timestamp end_time = Timestamp::PlusInfinity();
- EXPECT_CALL(packet_router, SendPacket)
- .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
- const PacedPacketInfo& cluster_info) {
- ++packets_sent;
- if (packets_sent == kPacketsToSend) {
- end_time = time_controller.GetClock()->CurrentTime();
- }
- });
+ // Expect all of them to be sent.
+ size_t packets_sent = 0;
+ Timestamp end_time = Timestamp::PlusInfinity();
+ EXPECT_CALL(packet_router, SendPacket)
+ .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) {
+ ++packets_sent;
+ if (packets_sent == kPacketsToSend) {
+ end_time = time_controller.GetClock()->CurrentTime();
+ }
+ });
- const Timestamp start_time = time_controller.GetClock()->CurrentTime();
+ const Timestamp start_time = time_controller.GetClock()->CurrentTime();
- // Packets should be sent over a period of close to 1s. Expect a little
- // lower than this since initial probing is a bit quicker.
- time_controller.AdvanceTime(TimeDelta::Seconds(1));
- EXPECT_EQ(packets_sent, kPacketsToSend);
- ASSERT_TRUE(end_time.IsFinite());
- EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
- }
+ // Packets should be sent over a period of close to 1s. Expect a little
+ // lower than this since initial probing is a bit quicker.
+ time_controller.AdvanceTime(TimeDelta::Seconds(1));
+ EXPECT_EQ(packets_sent, kPacketsToSend);
+ ASSERT_TRUE(end_time.IsFinite());
+ EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
+}
- TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
- GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
- MockPacketRouter packet_router;
- TaskQueuePacedSenderForTest pacer(
- time_controller.GetClock(), &packet_router,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
- PacingController::kMinSleepTime);
+TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(
+ time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+ PacingController::kMinSleepTime, kNoPacketHoldback);
- // Insert a number of packets to be sent 200ms apart.
- const size_t kPacketsPerSecond = 5;
- const DataRate kPacingRate =
- DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond);
- pacer.SetPacingRates(kPacingRate, DataRate::Zero());
- pacer.EnsureStarted();
+ // Insert a number of packets to be sent 200ms apart.
+ const size_t kPacketsPerSecond = 5;
+ const DataRate kPacingRate =
+ DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond);
+ pacer.SetPacingRates(kPacingRate, DataRate::Zero());
+ pacer.EnsureStarted();
- // Send some initial packets to be rid of any probes.
- EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond);
- pacer.EnqueuePackets(
- GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond));
- time_controller.AdvanceTime(TimeDelta::Seconds(1));
+ // Send some initial packets to be rid of any probes.
+ EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond);
+ pacer.EnqueuePackets(
+ GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond));
+ time_controller.AdvanceTime(TimeDelta::Seconds(1));
- // Insert three packets, and record send time of each of them.
- // After the second packet is sent, double the send rate so we can
- // check the third packets is sent after half the wait time.
- Timestamp first_packet_time = Timestamp::MinusInfinity();
- Timestamp second_packet_time = Timestamp::MinusInfinity();
- Timestamp third_packet_time = Timestamp::MinusInfinity();
+ // Insert three packets, and record send time of each of them.
+ // After the second packet is sent, double the send rate so we can
+ // check the third packets is sent after half the wait time.
+ Timestamp first_packet_time = Timestamp::MinusInfinity();
+ Timestamp second_packet_time = Timestamp::MinusInfinity();
+ Timestamp third_packet_time = Timestamp::MinusInfinity();
- EXPECT_CALL(packet_router, SendPacket)
- .Times(3)
- .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
- const PacedPacketInfo& cluster_info) {
- if (first_packet_time.IsInfinite()) {
- first_packet_time = time_controller.GetClock()->CurrentTime();
- } else if (second_packet_time.IsInfinite()) {
- second_packet_time = time_controller.GetClock()->CurrentTime();
- pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero());
- } else {
- third_packet_time = time_controller.GetClock()->CurrentTime();
- }
- });
+ EXPECT_CALL(packet_router, SendPacket)
+ .Times(3)
+ .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) {
+ if (first_packet_time.IsInfinite()) {
+ first_packet_time = time_controller.GetClock()->CurrentTime();
+ } else if (second_packet_time.IsInfinite()) {
+ second_packet_time = time_controller.GetClock()->CurrentTime();
+ pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero());
+ } else {
+ third_packet_time = time_controller.GetClock()->CurrentTime();
+ }
+ });
- pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3));
- time_controller.AdvanceTime(TimeDelta::Millis(500));
- ASSERT_TRUE(third_packet_time.IsFinite());
- EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
- 1.0);
- EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
- 1.0);
- }
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3));
+ time_controller.AdvanceTime(TimeDelta::Millis(500));
+ ASSERT_TRUE(third_packet_time.IsFinite());
+ EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
+ 1.0);
+ EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
+ 1.0);
+}
- TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
- GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
- MockPacketRouter packet_router;
- TaskQueuePacedSenderForTest pacer(
- time_controller.GetClock(), &packet_router,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
- PacingController::kMinSleepTime);
+TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(
+ time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+ PacingController::kMinSleepTime, kNoPacketHoldback);
- const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
- const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
- const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
+ const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
- pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
- pacer.EnsureStarted();
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+ pacer.EnsureStarted();
- // Add some initial video packets, only one should be sent.
- EXPECT_CALL(packet_router, SendPacket);
- pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
+ // Add some initial video packets, only one should be sent.
+ EXPECT_CALL(packet_router, SendPacket);
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ ::testing::Mock::VerifyAndClearExpectations(&packet_router);
+
+ // Advance time, but still before next packet should be sent.
+ time_controller.AdvanceTime(kPacketPacingTime / 2);
+
+ // Insert an audio packet, it should be sent immediately.
+ EXPECT_CALL(packet_router, SendPacket);
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ ::testing::Mock::VerifyAndClearExpectations(&packet_router);
+}
+
+TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
+ const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller.GetTaskQueueFactory(),
+ kCoalescingWindow, kNoPacketHoldback);
+
+ // Set rates so one packet adds one ms of buffer level.
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
+ const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
+
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+ pacer.EnsureStarted();
+
+ // Add 10 packets. The first should be sent immediately since the buffers
+ // are clear.
+ EXPECT_CALL(packet_router, SendPacket);
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ ::testing::Mock::VerifyAndClearExpectations(&packet_router);
+
+ // Advance time to 1ms before the coalescing window ends. No packets should
+ // be sent.
+ EXPECT_CALL(packet_router, SendPacket).Times(0);
+ time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
+
+ // Advance time to where coalescing window ends. All packets that should
+ // have been sent up til now will be sent.
+ EXPECT_CALL(packet_router, SendPacket).Times(5);
+ time_controller.AdvanceTime(TimeDelta::Millis(1));
+ ::testing::Mock::VerifyAndClearExpectations(&packet_router);
+}
+
+TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
+ const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller.GetTaskQueueFactory(),
+ kCoalescingWindow, kNoPacketHoldback);
+
+ // Set rates so one packet adds one ms of buffer level.
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
+ const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
+
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+ pacer.EnsureStarted();
+
+ // Add 10 packets. The first should be sent immediately since the buffers
+ // are clear. This will also trigger the probe to start.
+ EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
+ pacer.CreateProbeCluster(kPacingDataRate * 2, 17);
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ ::testing::Mock::VerifyAndClearExpectations(&packet_router);
+
+ // Advance time to 1ms before the coalescing window ends. Packets should be
+ // flying.
+ EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
+ time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
+}
+
+TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) {
+ const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller.GetTaskQueueFactory(),
+ kCoalescingWindow, kNoPacketHoldback);
+ const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300);
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+ pacer.EnsureStarted();
+
+ const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
+
+ // Nothing inserted, no stats updates yet.
+ EXPECT_EQ(pacer.num_stats_updates_, 0u);
+
+ // Insert one packet, stats should be updated.
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ EXPECT_EQ(pacer.num_stats_updates_, 1u);
+
+ // Advance time half of the min stats update interval, and trigger a
+ // refresh - stats should not be updated yet.
+ time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2);
+ pacer.EnqueuePackets({});
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ EXPECT_EQ(pacer.num_stats_updates_, 1u);
+
+ // Advance time the next half, now stats update is triggered.
+ time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2);
+ pacer.EnqueuePackets({});
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ EXPECT_EQ(pacer.num_stats_updates_, 2u);
+}
+
+TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) {
+ const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller.GetTaskQueueFactory(),
+ kCoalescingWindow, kNoPacketHoldback);
+
+ // Set rates so one packet adds 10ms of buffer level.
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = TimeDelta::Millis(10);
+ const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
+ const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
+ const TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33);
+
+ // Nothing inserted, no stats updates yet.
+ size_t num_expected_stats_updates = 0;
+ EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+ pacer.EnsureStarted();
+ time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates);
+ // Updating pacing rates refreshes stats.
+ EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
+
+ // Record time when we insert first packet, this triggers the scheduled
+ // stats updating.
+ Clock* const clock = time_controller.GetClock();
+ const Timestamp start_time = clock->CurrentTime();
+
+ while (clock->CurrentTime() - start_time <=
+ kMaxTimeBetweenStatsUpdates - kPacketPacingTime) {
+ // Enqueue packet, expect stats update.
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
time_controller.AdvanceTime(TimeDelta::Zero());
- ::testing::Mock::VerifyAndClearExpectations(&packet_router);
+ EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
- // Advance time, but still before next packet should be sent.
+ // Advance time to halfway through pacing time, expect another stats
+ // update.
time_controller.AdvanceTime(kPacketPacingTime / 2);
-
- // Insert an audio packet, it should be sent immediately.
- EXPECT_CALL(packet_router, SendPacket);
- pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
- time_controller.AdvanceTime(TimeDelta::Zero());
- ::testing::Mock::VerifyAndClearExpectations(&packet_router);
- }
-
- TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
- const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
- GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
- MockPacketRouter packet_router;
- TaskQueuePacedSenderForTest pacer(
- time_controller.GetClock(), &packet_router,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
- kCoalescingWindow);
-
- // Set rates so one packet adds one ms of buffer level.
- const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
- const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
- const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
-
- pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
- pacer.EnsureStarted();
-
- // Add 10 packets. The first should be sent immediately since the buffers
- // are clear.
- EXPECT_CALL(packet_router, SendPacket);
- pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
- time_controller.AdvanceTime(TimeDelta::Zero());
- ::testing::Mock::VerifyAndClearExpectations(&packet_router);
-
- // Advance time to 1ms before the coalescing window ends. No packets should
- // be sent.
- EXPECT_CALL(packet_router, SendPacket).Times(0);
- time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
-
- // Advance time to where coalescing window ends. All packets that should
- // have been sent up til now will be sent.
- EXPECT_CALL(packet_router, SendPacket).Times(5);
- time_controller.AdvanceTime(TimeDelta::Millis(1));
- ::testing::Mock::VerifyAndClearExpectations(&packet_router);
- }
-
- TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
- const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
- GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
- MockPacketRouter packet_router;
- TaskQueuePacedSenderForTest pacer(
- time_controller.GetClock(), &packet_router,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
- kCoalescingWindow);
-
- // Set rates so one packet adds one ms of buffer level.
- const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
- const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
- const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
-
- pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
- pacer.EnsureStarted();
-
- // Add 10 packets. The first should be sent immediately since the buffers
- // are clear. This will also trigger the probe to start.
- EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
- pacer.CreateProbeCluster(kPacingDataRate * 2, 17);
- pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
- time_controller.AdvanceTime(TimeDelta::Zero());
- ::testing::Mock::VerifyAndClearExpectations(&packet_router);
-
- // Advance time to 1ms before the coalescing window ends. Packets should be
- // flying.
- EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
- time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
- }
-
- TEST(TaskQueuePacedSenderTest, RespectedMinTimeBetweenStatsUpdates) {
- const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
- GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
- MockPacketRouter packet_router;
- TaskQueuePacedSenderForTest pacer(
- time_controller.GetClock(), &packet_router,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
- kCoalescingWindow);
- const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300);
- pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
- pacer.EnsureStarted();
-
- const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
-
- // Nothing inserted, no stats updates yet.
- EXPECT_EQ(pacer.num_stats_updates_, 0u);
-
- // Insert one packet, stats should be updated.
- pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
- time_controller.AdvanceTime(TimeDelta::Zero());
- EXPECT_EQ(pacer.num_stats_updates_, 1u);
-
- // Advance time half of the min stats update interval, and trigger a
- // refresh - stats should not be updated yet.
- time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2);
pacer.EnqueuePackets({});
time_controller.AdvanceTime(TimeDelta::Zero());
- EXPECT_EQ(pacer.num_stats_updates_, 1u);
-
- // Advance time the next half, now stats update is triggered.
- time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates / 2);
- pacer.EnqueuePackets({});
- time_controller.AdvanceTime(TimeDelta::Zero());
- EXPECT_EQ(pacer.num_stats_updates_, 2u);
- }
-
- TEST(TaskQueuePacedSenderTest, ThrottlesStatsUpdates) {
- const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
- GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
- MockPacketRouter packet_router;
- TaskQueuePacedSenderForTest pacer(
- time_controller.GetClock(), &packet_router,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
- kCoalescingWindow);
-
- // Set rates so one packet adds 10ms of buffer level.
- const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
- const TimeDelta kPacketPacingTime = TimeDelta::Millis(10);
- const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
- const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
- const TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis(33);
-
- // Nothing inserted, no stats updates yet.
- size_t num_expected_stats_updates = 0;
- EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
- pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
- pacer.EnsureStarted();
- time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates);
- // Updating pacing rates refreshes stats.
EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
- // Record time when we insert first packet, this triggers the scheduled
- // stats updating.
- Clock* const clock = time_controller.GetClock();
- const Timestamp start_time = clock->CurrentTime();
-
- while (clock->CurrentTime() - start_time <=
- kMaxTimeBetweenStatsUpdates - kPacketPacingTime) {
- // Enqueue packet, expect stats update.
- pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
- time_controller.AdvanceTime(TimeDelta::Zero());
- EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
-
- // Advance time to halfway through pacing time, expect another stats
- // update.
- time_controller.AdvanceTime(kPacketPacingTime / 2);
- pacer.EnqueuePackets({});
- time_controller.AdvanceTime(TimeDelta::Zero());
- EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
-
- // Advance time the rest of the way.
- time_controller.AdvanceTime(kPacketPacingTime / 2);
- }
-
- // At this point, the pace queue is drained so there is no more intersting
- // update to be made - but there is still as schduled task that should run
- // `kMaxTimeBetweenStatsUpdates` after the first update.
- time_controller.AdvanceTime(start_time + kMaxTimeBetweenStatsUpdates -
- clock->CurrentTime());
- EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
-
- // Advance time a significant time - don't expect any more calls as stats
- // updating does not happen when queue is drained.
- time_controller.AdvanceTime(TimeDelta::Millis(400));
- EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
+ // Advance time the rest of the way.
+ time_controller.AdvanceTime(kPacketPacingTime / 2);
}
- TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
- ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
- GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
- MockPacketRouter packet_router;
- TaskQueuePacedSenderForTest pacer(
- time_controller.GetClock(), &packet_router,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
- PacingController::kMinSleepTime);
+ // At this point, the pace queue is drained so there is no more intersting
+ // update to be made - but there is still as schduled task that should run
+ // `kMaxTimeBetweenStatsUpdates` after the first update.
+ time_controller.AdvanceTime(start_time + kMaxTimeBetweenStatsUpdates -
+ clock->CurrentTime());
+ EXPECT_EQ(pacer.num_stats_updates_, ++num_expected_stats_updates);
- // Set rates so one packet adds 4ms of buffer level.
- const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
- const TimeDelta kPacketPacingTime = TimeDelta::Millis(4);
- const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
- pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero());
- pacer.EnsureStarted();
- EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
- return std::vector<std::unique_ptr<RtpPacketToSend>>();
- });
- EXPECT_CALL(packet_router, GeneratePadding(_))
- .WillRepeatedly(
- [](DataSize target_size) { return GeneratePadding(target_size); });
+ // Advance time a significant time - don't expect any more calls as stats
+ // updating does not happen when queue is drained.
+ time_controller.AdvanceTime(TimeDelta::Millis(400));
+ EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates);
+}
- // Enqueue two packets, only the first is sent immediately and the next
- // will be scheduled for sending in 4ms.
- pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2));
- const int kNotAProbe = PacedPacketInfo::kNotAProbe;
- EXPECT_CALL(
- packet_router,
- SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
- kNotAProbe)));
- // Advance to less than 3ms before next packet send time.
- time_controller.AdvanceTime(TimeDelta::Micros(1001));
+TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
+ ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(
+ time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+ PacingController::kMinSleepTime, kNoPacketHoldback);
- // Trigger a probe at 4x the current pacing rate and insert the number of
- // packets the probe needs.
- const DataRate kProbeRate = 2 * kPacingDataRate;
- const int kProbeClusterId = 1;
- pacer.CreateProbeCluster(kProbeRate, kProbeClusterId);
+ // Set rates so one packet adds 4ms of buffer level.
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = TimeDelta::Millis(4);
+ const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
+ pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero());
+ pacer.EnsureStarted();
+ EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
+ return std::vector<std::unique_ptr<RtpPacketToSend>>();
+ });
+ EXPECT_CALL(packet_router, GeneratePadding(_))
+ .WillRepeatedly(
+ [](DataSize target_size) { return GeneratePadding(target_size); });
- // Expected size for each probe in a cluster is twice the expected bits
- // sent during min_probe_delta.
- // Expect one additional call since probe always starts with a small
- const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2);
- const DataSize kProbeSize = kProbeRate * kProbeTimeDelta;
- const size_t kNumPacketsInProbe =
- (kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize;
- EXPECT_CALL(
- packet_router,
- SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
- kProbeClusterId)))
- .Times(kNumPacketsInProbe + 1);
+ // Enqueue two packets, only the first is sent immediately and the next
+ // will be scheduled for sending in 4ms.
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2));
+ const int kNotAProbe = PacedPacketInfo::kNotAProbe;
+ EXPECT_CALL(packet_router,
+ SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
+ kNotAProbe)));
+ // Advance to less than 3ms before next packet send time.
+ time_controller.AdvanceTime(TimeDelta::Micros(1001));
- pacer.EnqueuePackets(
- GeneratePackets(RtpPacketMediaType::kVideo, kNumPacketsInProbe));
- time_controller.AdvanceTime(TimeDelta::Zero());
+ // Trigger a probe at 4x the current pacing rate and insert the number of
+ // packets the probe needs.
+ const DataRate kProbeRate = 2 * kPacingDataRate;
+ const int kProbeClusterId = 1;
+ pacer.CreateProbeCluster(kProbeRate, kProbeClusterId);
- // The pacer should have scheduled the next probe to be sent in
- // kProbeTimeDelta. That there was existing scheduled call less than
- // PacingController::kMinSleepTime before this should not matter.
+ // Expected size for each probe in a cluster is twice the expected bits
+ // sent during min_probe_delta.
+ // Expect one additional call since probe always starts with a small
+ const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2);
+ const DataSize kProbeSize = kProbeRate * kProbeTimeDelta;
+ const size_t kNumPacketsInProbe =
+ (kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize;
+ EXPECT_CALL(packet_router,
+ SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
+ kProbeClusterId)))
+ .Times(kNumPacketsInProbe + 1);
- EXPECT_CALL(
- packet_router,
- SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
- kProbeClusterId)))
- .Times(AtLeast(1));
- time_controller.AdvanceTime(TimeDelta::Millis(2));
- }
+ pacer.EnqueuePackets(
+ GeneratePackets(RtpPacketMediaType::kVideo, kNumPacketsInProbe));
+ time_controller.AdvanceTime(TimeDelta::Zero());
- TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
- // Set min_probe_delta to be less than kMinSleepTime (1ms).
- const TimeDelta kMinProbeDelta = TimeDelta::Micros(100);
- ScopedFieldTrials trials(
- "WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
- GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
- MockPacketRouter packet_router;
- TaskQueuePacedSenderForTest pacer(
- time_controller.GetClock(), &packet_router,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
- PacingController::kMinSleepTime);
+ // The pacer should have scheduled the next probe to be sent in
+ // kProbeTimeDelta. That there was existing scheduled call less than
+ // PacingController::kMinSleepTime before this should not matter.
- // Set rates so one packet adds 4ms of buffer level.
- const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
- const TimeDelta kPacketPacingTime = TimeDelta::Millis(4);
- const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
- pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero());
- pacer.EnsureStarted();
- EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
- return std::vector<std::unique_ptr<RtpPacketToSend>>();
- });
- EXPECT_CALL(packet_router, GeneratePadding)
- .WillRepeatedly(
- [](DataSize target_size) { return GeneratePadding(target_size); });
+ EXPECT_CALL(packet_router,
+ SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
+ kProbeClusterId)))
+ .Times(AtLeast(1));
+ time_controller.AdvanceTime(TimeDelta::Millis(2));
+}
- // Set a high probe rate.
- const int kProbeClusterId = 1;
- DataRate kProbingRate = kPacingDataRate * 10;
- pacer.CreateProbeCluster(kProbingRate, kProbeClusterId);
+TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
+ // Set min_probe_delta to be less than kMinSleepTime (1ms).
+ const TimeDelta kMinProbeDelta = TimeDelta::Micros(100);
+ ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(
+ time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+ PacingController::kMinSleepTime, kNoPacketHoldback);
- // Advance time less than PacingController::kMinSleepTime, probing packets
- // for the first millisecond should be sent immediately. Min delta between
- // probes is 2x 100us, meaning 4 times per ms we will get least one call to
- // SendPacket().
- DataSize data_sent = DataSize::Zero();
- EXPECT_CALL(
- packet_router,
- SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
- kProbeClusterId)))
- .Times(AtLeast(4))
- .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
- const PacedPacketInfo&) {
- data_sent +=
- DataSize::Bytes(packet->payload_size() + packet->padding_size());
- });
+ // Set rates so one packet adds 4ms of buffer level.
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = TimeDelta::Millis(4);
+ const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
+ pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero());
+ pacer.EnsureStarted();
+ EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
+ return std::vector<std::unique_ptr<RtpPacketToSend>>();
+ });
+ EXPECT_CALL(packet_router, GeneratePadding)
+ .WillRepeatedly(
+ [](DataSize target_size) { return GeneratePadding(target_size); });
- // Add one packet to kickstart probing, the rest will be padding packets.
- pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
- time_controller.AdvanceTime(kMinProbeDelta);
+ // Set a high probe rate.
+ const int kProbeClusterId = 1;
+ DataRate kProbingRate = kPacingDataRate * 10;
+ pacer.CreateProbeCluster(kProbingRate, kProbeClusterId);
- // Verify the amount of probing data sent.
- // Probe always starts with a small (1 byte) padding packet that's not
- // counted into the probe rate here.
- EXPECT_EQ(data_sent,
- kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1));
- }
+ // Advance time less than PacingController::kMinSleepTime, probing packets
+ // for the first millisecond should be sent immediately. Min delta between
+ // probes is 2x 100us, meaning 4 times per ms we will get least one call to
+ // SendPacket().
+ DataSize data_sent = DataSize::Zero();
+ EXPECT_CALL(packet_router,
+ SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id,
+ kProbeClusterId)))
+ .Times(AtLeast(4))
+ .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo&) {
+ data_sent +=
+ DataSize::Bytes(packet->payload_size() + packet->padding_size());
+ });
- TEST(TaskQueuePacedSenderTest, NoStatsUpdatesBeforeStart) {
- const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
- GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
- MockPacketRouter packet_router;
- TaskQueuePacedSenderForTest pacer(
- time_controller.GetClock(), &packet_router,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
- kCoalescingWindow);
- const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300);
- pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+ // Add one packet to kickstart probing, the rest will be padding packets.
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
+ time_controller.AdvanceTime(kMinProbeDelta);
- const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
+ // Verify the amount of probing data sent.
+ // Probe always starts with a small (1 byte) padding packet that's not
+ // counted into the probe rate here.
+ EXPECT_EQ(data_sent,
+ kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1));
+}
- // Nothing inserted, no stats updates yet.
- EXPECT_EQ(pacer.num_stats_updates_, 0u);
+TEST(TaskQueuePacedSenderTest, NoStatsUpdatesBeforeStart) {
+ const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller.GetTaskQueueFactory(),
+ kCoalescingWindow, kNoPacketHoldback);
+ const DataRate kPacingDataRate = DataRate::KilobitsPerSec(300);
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
- // Insert one packet, stats should not be updated.
- pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
- time_controller.AdvanceTime(TimeDelta::Zero());
- EXPECT_EQ(pacer.num_stats_updates_, 0u);
+ const TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis(1);
- // Advance time of the min stats update interval, and trigger a
- // refresh - stats should not be updated still.
- time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates);
- EXPECT_EQ(pacer.num_stats_updates_, 0u);
- }
+ // Nothing inserted, no stats updates yet.
+ EXPECT_EQ(pacer.num_stats_updates_, 0u);
+
+ // Insert one packet, stats should not be updated.
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ EXPECT_EQ(pacer.num_stats_updates_, 0u);
+
+ // Advance time of the min stats update interval, and trigger a
+ // refresh - stats should not be updated still.
+ time_controller.AdvanceTime(kMinTimeBetweenStatsUpdates);
+ EXPECT_EQ(pacer.num_stats_updates_, 0u);
+}
+
+TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
+ const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(10);
+ const int kPacketBasedHoldback = 5;
+
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(
+ time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+ kFixedCoalescingWindow, kPacketBasedHoldback);
+
+ // Set rates so one packet adds one ms of buffer level.
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
+ const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
+ const TimeDelta kExpectedHoldbackWindow =
+ kPacketPacingTime * kPacketBasedHoldback;
+ // `kFixedCoalescingWindow` sets the upper bound for the window.
+ ASSERT_GE(kFixedCoalescingWindow, kExpectedHoldbackWindow);
+
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+ EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
+ return std::vector<std::unique_ptr<RtpPacketToSend>>();
+ });
+ pacer.EnsureStarted();
+
+ // Add some packets and wait till all have been sent, so that the pacer
+ // has a valid estimate of packet size.
+ const int kNumWarmupPackets = 40;
+ EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets);
+ pacer.EnqueuePackets(
+ GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets));
+ // Wait until all packes have been sent, with a 2x margin.
+ time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2));
+
+ // Enqueue packets. Expect only the first one to be sent immediately.
+ EXPECT_CALL(packet_router, SendPacket).Times(1);
+ pacer.EnqueuePackets(
+ GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+
+ // Advance time to 1ms before the coalescing window ends.
+ EXPECT_CALL(packet_router, SendPacket).Times(0);
+ time_controller.AdvanceTime(kExpectedHoldbackWindow - TimeDelta::Millis(1));
+
+ // Advance past where the coalescing window should end.
+ EXPECT_CALL(packet_router, SendPacket).Times(kPacketBasedHoldback - 1);
+ time_controller.AdvanceTime(TimeDelta::Millis(1));
+}
+
+TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) {
+ const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(2);
+ const int kPacketBasedHoldback = 5;
+
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSenderForTest pacer(
+ time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
+ kFixedCoalescingWindow, kPacketBasedHoldback);
+
+ // Set rates so one packet adds one ms of buffer level.
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
+ const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
+ const TimeDelta kExpectedPacketHoldbackWindow =
+ kPacketPacingTime * kPacketBasedHoldback;
+ // |kFixedCoalescingWindow| sets the upper bound for the window.
+ ASSERT_LT(kFixedCoalescingWindow, kExpectedPacketHoldbackWindow);
+
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+ EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() {
+ return std::vector<std::unique_ptr<RtpPacketToSend>>();
+ });
+ pacer.EnsureStarted();
+
+ // Add some packets and wait till all have been sent, so that the pacer
+ // has a valid estimate of packet size.
+ const int kNumWarmupPackets = 40;
+ EXPECT_CALL(packet_router, SendPacket).Times(kNumWarmupPackets);
+ pacer.EnqueuePackets(
+ GeneratePackets(RtpPacketMediaType::kVideo, kNumWarmupPackets));
+ // Wait until all packes have been sent, with a 2x margin.
+ time_controller.AdvanceTime(kPacketPacingTime * (kNumWarmupPackets * 2));
+
+ // Enqueue packets. Expect onlt the first one to be sent immediately.
+ EXPECT_CALL(packet_router, SendPacket).Times(1);
+ pacer.EnqueuePackets(
+ GeneratePackets(RtpPacketMediaType::kVideo, kPacketBasedHoldback));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+
+ // Advance time to the fixed coalescing window, that should take presedence so
+ // at least some of the packets should be sent.
+ EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
+ time_controller.AdvanceTime(kFixedCoalescingWindow);
+}
+
} // namespace test
} // namespace webrtc