Reland "Fixes dynamic mode pacing issues."
This is a reland of 72e6cb0b3f548900fd3b548b4b6966e3f5ee854f
Was not the cause of perf alert, relanding.
TBR=ilnik@webrtc.org
Original change's description:
> Fixes dynamic mode pacing issues.
>
> This CL fixes a few issues in the (default-disabled) dynamic pacing
> mode:
> * Slight update to sleep timing to avoid short spin loops
> * Removed support for early execution as that lead to time-travel
> contradictions that were difficult to solve.
> * Makes sure we schedule a process call when a packet is due to be
> drained even if the queue is empty, so that padding will start at
> the correct time.
> * While paused or empty, sleep relative last send time if we send
> padding while silent - otherwise just relative to last process
> time.
> * If target send time shifts so far back that packet should have
> been sent prior to the last process, make sure we don't let the
> buffer level remain.
> * Update the PacedSender test to _actually_ use dynamic processing
> when the param says so.
>
> Bug: webrtc:10809
> Change-Id: Iebfde9769647d2390fd192a40bbe2d5bf1f6cc62
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/160407
> Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
> Commit-Queue: Erik Språng <sprang@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#29911}
Bug: webrtc:10809
Change-Id: Ie7b307e574c2057bb05af87b6718a132d639a416
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/160786
Reviewed-by: Erik Språng <sprang@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29928}
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index 0a3d3c0..f6c85d4 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -153,7 +153,7 @@
TimeDelta sleep_time =
std::max(TimeDelta::Zero(), next_send_time - clock_->CurrentTime());
if (process_mode_ == PacingController::ProcessMode::kDynamic) {
- return sleep_time.RoundTo(TimeDelta::ms(1)).ms();
+ return std::max(sleep_time, PacingController::kMinSleepTime).ms();
}
return sleep_time.ms();
}
diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc
index 23f1d60..7d1b4cb 100644
--- a/modules/pacing/paced_sender_unittest.cc
+++ b/modules/pacing/paced_sender_unittest.cc
@@ -28,16 +28,13 @@
using ::testing::Return;
using ::testing::SaveArg;
+namespace webrtc {
namespace {
constexpr uint32_t kAudioSsrc = 12345;
constexpr uint32_t kVideoSsrc = 234565;
constexpr uint32_t kVideoRtxSsrc = 34567;
constexpr uint32_t kFlexFecSsrc = 45678;
constexpr size_t kDefaultPacketSize = 234;
-} // namespace
-
-namespace webrtc {
-namespace test {
// Mock callback implementing the raw api.
class MockCallback : public PacketRouter {
@@ -50,17 +47,41 @@
std::vector<std::unique_ptr<RtpPacketToSend>>(size_t target_size_bytes));
};
+class ProcessModeTrials : public WebRtcKeyValueConfig {
+ public:
+ explicit ProcessModeTrials(bool dynamic_process) : mode_(dynamic_process) {}
+
+ std::string Lookup(absl::string_view key) const override {
+ if (key == "WebRTC-Pacer-DynamicProcess") {
+ return mode_ ? "Enabled" : "Disabled";
+ }
+ return "";
+ }
+
+ private:
+ const bool mode_;
+};
+} // namespace
+
+namespace test {
+
class PacedSenderTest
: public ::testing::TestWithParam<PacingController::ProcessMode> {
public:
- PacedSenderTest() : clock_(0), paced_module_(nullptr) {}
+ PacedSenderTest()
+ : clock_(0),
+ paced_module_(nullptr),
+ trials_(GetParam() == PacingController::ProcessMode::kDynamic) {}
void SetUp() override {
EXPECT_CALL(process_thread_, RegisterModule)
.WillOnce(SaveArg<0>(&paced_module_));
pacer_ = std::make_unique<PacedSender>(&clock_, &callback_, nullptr,
- nullptr, &process_thread_);
+ &trials_, &process_thread_);
+ EXPECT_CALL(process_thread_, WakeUp).WillRepeatedly([&](Module* module) {
+ clock_.AdvanceTimeMilliseconds(module->TimeUntilNextProcess());
+ });
EXPECT_CALL(process_thread_, DeRegisterModule(paced_module_)).Times(1);
}
@@ -92,6 +113,7 @@
MockCallback callback_;
MockProcessThread process_thread_;
Module* paced_module_;
+ ProcessModeTrials trials_;
std::unique_ptr<PacedSender> pacer_;
};
@@ -108,7 +130,6 @@
// Expect all of them to be sent.
size_t packets_sent = 0;
- clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess());
EXPECT_CALL(callback_, SendPacket)
.WillRepeatedly(
[&](std::unique_ptr<RtpPacketToSend> packet,
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index 985fb5c..8be6209 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -276,6 +276,7 @@
if (last_process_time_.IsMinusInfinity()) {
return TimeDelta::Zero();
}
+ RTC_DCHECK_GE(now, last_process_time_);
TimeDelta elapsed_time = now - last_process_time_;
last_process_time_ = now;
if (elapsed_time > kMaxElapsedTime) {
@@ -334,9 +335,11 @@
return last_send_time_ + kCongestedPacketInterval;
}
- // If there are pending packets, check how long it will take until buffers
- // have emptied.
- if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {
+ // Check how long until media buffer has drained. We schedule a call
+ // for when the last packet in the queue drains as otherwise we may
+ // be late in starting padding.
+ if (media_rate_ > DataRate::Zero() &&
+ (!packet_queue_.Empty() || !media_debt_.IsZero())) {
return std::min(last_send_time_ + kPausedProcessInterval,
last_process_time_ + media_debt_ / media_rate_);
}
@@ -348,21 +351,38 @@
last_process_time_ + padding_debt_ / padding_rate_);
}
- return last_send_time_ + kPausedProcessInterval;
+ if (send_padding_if_silent_) {
+ return last_send_time_ + kPausedProcessInterval;
+ }
+ return last_process_time_ + kPausedProcessInterval;
}
void PacingController::ProcessPackets() {
Timestamp now = CurrentTime();
- RTC_DCHECK_GE(now, last_process_time_);
Timestamp target_send_time = now;
if (mode_ == ProcessMode::kDynamic) {
target_send_time = NextSendTime();
if (target_send_time.IsMinusInfinity()) {
target_send_time = now;
- } else if (now + kMinSleepTime < target_send_time) {
+ } else if (now < target_send_time) {
// We are too early, abort and regroup!
return;
}
+
+ if (target_send_time < last_process_time_) {
+ // After the last process call, at time X, the target send time
+ // shifted to be earlier than X. This should normally not happen
+ // but we want to make sure rounding errors or erratic behavior
+ // of NextSendTime() does not cause issue. In particular, if the
+ // buffer reduction of
+ // rate * (target_send_time - previous_process_time)
+ // in the main loop doesn't clean up the existing debt we may not
+ // be able to send again. We don't want to check this reordering
+ // there as it is the normal exit condtion when the buffer is
+ // exhausted and there are packets in the queue.
+ UpdateBudgetWithElapsedTime(last_process_time_ - target_send_time);
+ target_send_time = last_process_time_;
+ }
}
Timestamp previous_process_time = last_process_time_;
@@ -585,6 +605,7 @@
return nullptr;
}
} else {
+ // Dynamic processing mode.
if (now <= target_send_time) {
// We allow sending slightly early if we think that we would actually
// had been able to, had we been right on time - i.e. the current debt
@@ -593,11 +614,6 @@
if (now + flush_time > target_send_time) {
return nullptr;
}
- } else {
- // In dynamic mode we should never try get a non-probe packet until
- // the media debt is actually zero. Since there can be rounding errors,
- // allow some discrepancy.
- RTC_DCHECK_LE(media_debt_, media_rate_ * kMinSleepTime);
}
}
}
diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc
index 5b5f6e7..9337ad2 100644
--- a/modules/pacing/pacing_controller_unittest.cc
+++ b/modules/pacing/pacing_controller_unittest.cc
@@ -732,33 +732,48 @@
EXPECT_LE((actual_pace_time - expected_pace_time).Abs(),
PacingController::kMinSleepTime);
- // Pacing media happens 2.5x factor, but padding was configured with 1.0x
+ // Pacing media happens at 2.5x, but padding was configured with 1.0x
// factor. We have to wait until the padding debt is gone before we start
// sending padding.
const TimeDelta time_to_padding_debt_free =
(expected_pace_time * kPaceMultiplier) - actual_pace_time;
- TimeDelta time_to_next = pacer_->NextSendTime() - clock_.CurrentTime();
- EXPECT_EQ(time_to_next, time_to_padding_debt_free);
- clock_.AdvanceTime(time_to_next);
+ clock_.AdvanceTime(time_to_padding_debt_free -
+ PacingController::kMinSleepTime);
+ pacer_->ProcessPackets();
// Send 10 padding packets.
const size_t kPaddingPacketsToSend = 10;
DataSize padding_sent = DataSize::Zero();
+ size_t packets_sent = 0;
+ Timestamp first_send_time = Timestamp::MinusInfinity();
+ Timestamp last_send_time = Timestamp::MinusInfinity();
+
EXPECT_CALL(callback_, SendPadding)
.Times(kPaddingPacketsToSend)
.WillRepeatedly([&](size_t target_size) {
- padding_sent += DataSize::bytes(target_size);
+ ++packets_sent;
+ if (packets_sent < kPaddingPacketsToSend) {
+ // Don't count bytes of last packet, instead just
+ // use this as the time the last packet finished
+ // sending.
+ padding_sent += DataSize::bytes(target_size);
+ }
+ if (first_send_time.IsInfinite()) {
+ first_send_time = clock_.CurrentTime();
+ } else {
+ last_send_time = clock_.CurrentTime();
+ }
return target_size;
});
EXPECT_CALL(callback_, SendPacket(_, _, _, false, true))
.Times(kPaddingPacketsToSend);
- const Timestamp padding_start_time = clock_.CurrentTime();
- for (size_t i = 0; i < kPaddingPacketsToSend; ++i) {
+
+ while (packets_sent < kPaddingPacketsToSend) {
AdvanceTimeAndProcess();
}
// Verify rate of sent padding.
- TimeDelta padding_duration = pacer_->NextSendTime() - padding_start_time;
+ TimeDelta padding_duration = last_send_time - first_send_time;
DataRate padding_rate = padding_sent / padding_duration;
EXPECT_EQ(padding_rate, kTargetRate);
}
@@ -781,15 +796,18 @@
SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
capture_time_ms, 250);
- EXPECT_CALL(callback_, SendPadding).WillOnce([](size_t padding) {
+ bool padding_sent = false;
+ EXPECT_CALL(callback_, SendPadding).WillOnce([&](size_t padding) {
+ padding_sent = true;
return padding;
});
EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1);
if (PeriodicProcess()) {
pacer_->ProcessPackets();
} else {
- AdvanceTimeAndProcess(); // Media.
- AdvanceTimeAndProcess(); // Padding.
+ while (!padding_sent) {
+ AdvanceTimeAndProcess();
+ }
}
}
@@ -1677,47 +1695,6 @@
}
}
-TEST_P(PacingControllerTest, TaskEarly) {
- if (PeriodicProcess()) {
- // This test applies only when NOT using interval budget.
- return;
- }
-
- // Set a low send rate to more easily test timing issues.
- DataRate kSendRate = DataRate::kbps(30);
- pacer_->SetPacingRates(kSendRate, DataRate::Zero());
-
- // Add two packets.
- pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo));
- pacer_->EnqueuePacket(BuildRtpPacket(RtpPacketToSend::Type::kVideo));
-
- // Process packets, only first should be sent.
- EXPECT_CALL(callback_, SendPacket).Times(1);
- pacer_->ProcessPackets();
-
- Timestamp next_send_time = pacer_->NextSendTime();
-
- // Packets won't be sent if we try process more than one sleep time early.
- ASSERT_GT(next_send_time - clock_.CurrentTime(),
- PacingController::kMinSleepTime);
- clock_.AdvanceTime(next_send_time - clock_.CurrentTime() -
- (PacingController::kMinSleepTime + TimeDelta::ms(1)));
-
- EXPECT_CALL(callback_, SendPacket).Times(0);
- pacer_->ProcessPackets();
-
- // Assume timing is accurate within +-100us due to rounding.
- const TimeDelta kErrorMargin = TimeDelta::us(100);
-
- // Check that next scheduled send time is still the same (within margin).
- EXPECT_LT((pacer_->NextSendTime() - next_send_time).Abs(), kErrorMargin);
-
- // Advance to within error margin for execution.
- clock_.AdvanceTime(TimeDelta::ms(1) + kErrorMargin);
- EXPECT_CALL(callback_, SendPacket).Times(1);
- pacer_->ProcessPackets();
-}
-
TEST_P(PacingControllerTest, TaskLate) {
if (PeriodicProcess()) {
// This test applies only when NOT using interval budget.