Optionally allows TaskQueuePacedSender to coalesce send events.
With an optional parameter this allows the task-queue based paced
sender to mimic the old behavior and coalesce sending of packets in
order to reduce thread wakeups and provide opportunity for batching.
This is done by simply overriding the minimum time the thread should
sleep. The pacing controller will already handle the "late wakup" case
and send any packets as if it had been woken at the optimal time.
Bug: webrtc:10809
Change-Id: Iceea00693a4e87d39b0e0ee8bdabca081dff2cba
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175648
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31328}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index 56c5e55..9baf164 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -91,13 +91,16 @@
event_log,
trials,
process_thread_.get())),
- task_queue_pacer_(use_task_queue_pacer_
- ? new TaskQueuePacedSender(clock,
- &packet_router_,
- event_log,
- trials,
- task_queue_factory)
- : nullptr),
+ task_queue_pacer_(
+ use_task_queue_pacer_
+ ? new TaskQueuePacedSender(
+ clock,
+ &packet_router_,
+ event_log,
+ trials,
+ task_queue_factory,
+ /*hold_back_window = */ PacingController::kMinSleepTime)
+ : nullptr),
observer_(nullptr),
controller_factory_override_(controller_factory),
controller_factory_fallback_(
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index f21e637..b1f6e89 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -193,6 +193,10 @@
return false;
}
+bool PacingController::IsProbing() const {
+ return prober_.is_probing();
+}
+
Timestamp PacingController::CurrentTime() const {
Timestamp time = clock_->CurrentTime();
if (time < last_timestamp_) {
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index 27f1614..20d2539 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -146,6 +146,8 @@
bool Congested() const;
+ bool IsProbing() const;
+
private:
void EnqueuePacketInternal(std::unique_ptr<RtpPacketToSend> packet,
int priority);
diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc
index a4ce9fe..d460d60 100644
--- a/modules/pacing/task_queue_paced_sender.cc
+++ b/modules/pacing/task_queue_paced_sender.cc
@@ -34,8 +34,10 @@
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
- TaskQueueFactory* task_queue_factory)
+ TaskQueueFactory* task_queue_factory,
+ TimeDelta hold_back_window)
: clock_(clock),
+ hold_back_window_(hold_back_window),
packet_router_(packet_router),
pacing_controller_(clock,
static_cast<PacingController::PacketSender*>(this),
@@ -200,8 +202,10 @@
next_process_time = pacing_controller_.NextSendTime();
}
- next_process_time =
- std::max(now + PacingController::kMinSleepTime, next_process_time);
+ const TimeDelta min_sleep = pacing_controller_.IsProbing()
+ ? PacingController::kMinSleepTime
+ : hold_back_window_;
+ next_process_time = std::max(now + min_sleep, next_process_time);
TimeDelta sleep_time = next_process_time - now;
if (next_process_time_.IsMinusInfinity() ||
diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h
index 8b47f5e..3241d3f 100644
--- a/modules/pacing/task_queue_paced_sender.h
+++ b/modules/pacing/task_queue_paced_sender.h
@@ -42,11 +42,18 @@
public RtpPacketSender,
private PacingController::PacketSender {
public:
- TaskQueuePacedSender(Clock* clock,
- PacketRouter* packet_router,
- RtcEventLog* event_log,
- const WebRtcKeyValueConfig* field_trials,
- TaskQueueFactory* task_queue_factory);
+ // The |hold_back_window| parameter sets a lower bound on time to sleep if
+ // there is currently a pacer queue and packets can't immediately be
+ // processed. Increasing this reduces thread wakeups at the expense of higher
+ // latency.
+ // TODO(bugs.webrtc.org/10809): Remove default value for hold_back_window.
+ TaskQueuePacedSender(
+ Clock* clock,
+ PacketRouter* packet_router,
+ RtcEventLog* event_log,
+ const WebRtcKeyValueConfig* field_trials,
+ TaskQueueFactory* task_queue_factory,
+ TimeDelta hold_back_window = PacingController::kMinSleepTime);
~TaskQueuePacedSender() override;
@@ -131,6 +138,7 @@
Stats GetStats() const;
Clock* const clock_;
+ const TimeDelta hold_back_window_;
PacketRouter* const packet_router_ RTC_GUARDED_BY(task_queue_);
PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_);
diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc
index ba2aad2..e93f776 100644
--- a/modules/pacing/task_queue_paced_sender_unittest.cc
+++ b/modules/pacing/task_queue_paced_sender_unittest.cc
@@ -24,6 +24,7 @@
#include "test/time_controller/simulated_time_controller.h"
using ::testing::_;
+using ::testing::AtLeast;
using ::testing::Return;
using ::testing::SaveArg;
@@ -48,17 +49,6 @@
namespace test {
-class TaskQueuePacedSenderTest : public ::testing::Test {
- public:
- TaskQueuePacedSenderTest()
- : time_controller_(Timestamp::Millis(1234)),
- pacer_(time_controller_.GetClock(),
- &packet_router_,
- /*event_log=*/nullptr,
- /*field_trials=*/nullptr,
- time_controller_.GetTaskQueueFactory()) {}
-
- protected:
std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) {
auto packet = std::make_unique<RtpPacketToSend>(nullptr);
packet->set_packet_type(type);
@@ -92,109 +82,193 @@
return packets;
}
- Timestamp CurrentTime() { return time_controller_.GetClock()->CurrentTime(); }
+ TEST(TaskQueuePacedSenderTest, PacesPackets) {
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller.GetTaskQueueFactory(),
+ PacingController::kMinSleepTime);
- GlobalSimulatedTimeController time_controller_;
- MockPacketRouter packet_router_;
- TaskQueuePacedSender pacer_;
-};
+ // Insert a number of packets, covering one second.
+ static constexpr size_t kPacketsToSend = 42;
+ pacer.SetPacingRates(
+ DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
+ DataRate::Zero());
+ pacer.EnqueuePackets(
+ GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
-TEST_F(TaskQueuePacedSenderTest, PacesPackets) {
- // Insert a number of packets, covering one second.
- static constexpr size_t kPacketsToSend = 42;
- pacer_.SetPacingRates(
- DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
- DataRate::Zero());
- pacer_.EnqueuePackets(
- GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
+ // Expect all of them to be sent.
+ size_t packets_sent = 0;
+ Timestamp end_time = Timestamp::PlusInfinity();
+ EXPECT_CALL(packet_router, SendPacket)
+ .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) {
+ ++packets_sent;
+ if (packets_sent == kPacketsToSend) {
+ end_time = time_controller.GetClock()->CurrentTime();
+ }
+ });
- // Expect all of them to be sent.
- size_t packets_sent = 0;
- Timestamp end_time = Timestamp::PlusInfinity();
- EXPECT_CALL(packet_router_, SendPacket)
- .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
- const PacedPacketInfo& cluster_info) {
- ++packets_sent;
- if (packets_sent == kPacketsToSend) {
- end_time = time_controller_.GetClock()->CurrentTime();
- }
- });
+ const Timestamp start_time = time_controller.GetClock()->CurrentTime();
- const Timestamp start_time = time_controller_.GetClock()->CurrentTime();
+ // Packets should be sent over a period of close to 1s. Expect a little
+ // lower than this since initial probing is a bit quicker.
+ time_controller.AdvanceTime(TimeDelta::Seconds(1));
+ EXPECT_EQ(packets_sent, kPacketsToSend);
+ ASSERT_TRUE(end_time.IsFinite());
+ EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
+ }
- // Packets should be sent over a period of close to 1s. Expect a little lower
- // than this since initial probing is a bit quicker.
- time_controller_.AdvanceTime(TimeDelta::Seconds(1));
- EXPECT_EQ(packets_sent, kPacketsToSend);
- ASSERT_TRUE(end_time.IsFinite());
- EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
-}
+ TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller.GetTaskQueueFactory(),
+ PacingController::kMinSleepTime);
-TEST_F(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
- // Insert a number of packets to be sent 200ms apart.
- const size_t kPacketsPerSecond = 5;
- const DataRate kPacingRate =
- DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond);
- pacer_.SetPacingRates(kPacingRate, DataRate::Zero());
+ // Insert a number of packets to be sent 200ms apart.
+ const size_t kPacketsPerSecond = 5;
+ const DataRate kPacingRate =
+ DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond);
+ pacer.SetPacingRates(kPacingRate, DataRate::Zero());
- // Send some initial packets to be rid of any probes.
- EXPECT_CALL(packet_router_, SendPacket).Times(kPacketsPerSecond);
- pacer_.EnqueuePackets(
- GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond));
- time_controller_.AdvanceTime(TimeDelta::Seconds(1));
+ // Send some initial packets to be rid of any probes.
+ EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond);
+ pacer.EnqueuePackets(
+ GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond));
+ time_controller.AdvanceTime(TimeDelta::Seconds(1));
- // Insert three packets, and record send time of each of them.
- // After the second packet is sent, double the send rate so we can
- // check the third packets is sent after half the wait time.
- Timestamp first_packet_time = Timestamp::MinusInfinity();
- Timestamp second_packet_time = Timestamp::MinusInfinity();
- Timestamp third_packet_time = Timestamp::MinusInfinity();
+ // Insert three packets, and record send time of each of them.
+ // After the second packet is sent, double the send rate so we can
+ // check the third packets is sent after half the wait time.
+ Timestamp first_packet_time = Timestamp::MinusInfinity();
+ Timestamp second_packet_time = Timestamp::MinusInfinity();
+ Timestamp third_packet_time = Timestamp::MinusInfinity();
- EXPECT_CALL(packet_router_, SendPacket)
- .Times(3)
- .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
- const PacedPacketInfo& cluster_info) {
- if (first_packet_time.IsInfinite()) {
- first_packet_time = CurrentTime();
- } else if (second_packet_time.IsInfinite()) {
- second_packet_time = CurrentTime();
- pacer_.SetPacingRates(2 * kPacingRate, DataRate::Zero());
- } else {
- third_packet_time = CurrentTime();
- }
- });
+ EXPECT_CALL(packet_router, SendPacket)
+ .Times(3)
+ .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) {
+ if (first_packet_time.IsInfinite()) {
+ first_packet_time = time_controller.GetClock()->CurrentTime();
+ } else if (second_packet_time.IsInfinite()) {
+ second_packet_time = time_controller.GetClock()->CurrentTime();
+ pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero());
+ } else {
+ third_packet_time = time_controller.GetClock()->CurrentTime();
+ }
+ });
- pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3));
- time_controller_.AdvanceTime(TimeDelta::Millis(500));
- ASSERT_TRUE(third_packet_time.IsFinite());
- EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
- 1.0);
- EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
- 1.0);
-}
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3));
+ time_controller.AdvanceTime(TimeDelta::Millis(500));
+ ASSERT_TRUE(third_packet_time.IsFinite());
+ EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
+ 1.0);
+ EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
+ 1.0);
+ }
-TEST_F(TaskQueuePacedSenderTest, SendsAudioImmediately) {
- const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
- const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
- const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
+ TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller.GetTaskQueueFactory(),
+ PacingController::kMinSleepTime);
- pacer_.SetPacingRates(kPacingDataRate, DataRate::Zero());
+ const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
- // Add some initial video packets, only one should be sent.
- EXPECT_CALL(packet_router_, SendPacket);
- pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
- time_controller_.AdvanceTime(TimeDelta::Zero());
- ::testing::Mock::VerifyAndClearExpectations(&packet_router_);
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
- // Advance time, but still before next packet should be sent.
- time_controller_.AdvanceTime(kPacketPacingTime / 2);
+ // Add some initial video packets, only one should be sent.
+ EXPECT_CALL(packet_router, SendPacket);
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ ::testing::Mock::VerifyAndClearExpectations(&packet_router);
- // Insert an audio packet, it should be sent immediately.
- EXPECT_CALL(packet_router_, SendPacket);
- pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
- time_controller_.AdvanceTime(TimeDelta::Zero());
- ::testing::Mock::VerifyAndClearExpectations(&packet_router_);
-}
+ // Advance time, but still before next packet should be sent.
+ time_controller.AdvanceTime(kPacketPacingTime / 2);
+
+ // Insert an audio packet, it should be sent immediately.
+ EXPECT_CALL(packet_router, SendPacket);
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ ::testing::Mock::VerifyAndClearExpectations(&packet_router);
+ }
+
+ TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
+ const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller.GetTaskQueueFactory(),
+ kCoalescingWindow);
+
+ // Set rates so one packet adds one ms of buffer level.
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
+ const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
+
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+
+ // Add 10 packets. The first should be sent immediately since the buffers
+ // are clear.
+ EXPECT_CALL(packet_router, SendPacket);
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ ::testing::Mock::VerifyAndClearExpectations(&packet_router);
+
+ // Advance time to 1ms before the coalescing window ends. No packets should
+ // be sent.
+ EXPECT_CALL(packet_router, SendPacket).Times(0);
+ time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
+
+ // Advance time to where coalescing window ends. All packets that should
+ // have been sent up til now will be sent.
+ EXPECT_CALL(packet_router, SendPacket).Times(5);
+ time_controller.AdvanceTime(TimeDelta::Millis(1));
+ ::testing::Mock::VerifyAndClearExpectations(&packet_router);
+ }
+
+ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
+ const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
+ GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
+ MockPacketRouter packet_router;
+ TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller.GetTaskQueueFactory(),
+ kCoalescingWindow);
+
+ // Set rates so one packet adds one ms of buffer level.
+ const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
+ const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
+ const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
+
+ pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
+
+ // Add 10 packets. The first should be sent immediately since the buffers
+ // are clear. This will also trigger the probe to start.
+ EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
+ pacer.CreateProbeCluster(kPacingDataRate * 2, 17);
+ pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
+ time_controller.AdvanceTime(TimeDelta::Zero());
+ ::testing::Mock::VerifyAndClearExpectations(&packet_router);
+
+ // Advance time to 1ms before the coalescing window ends. Packets should be
+ // flying.
+ EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
+ time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
+ }
} // namespace test
} // namespace webrtc