Extract most of PacedSender into PacedSendingController.
The Pacer now just handles interaction with Module/ProcessThread and
forwarding packets to PacketRouter.
All other logic is moved to PacedSendingController, including tests.
PacedSender unittest are now just some basic sanity tests.
Bug: webrtc:10809
Change-Id: I69223cd9d8300997375b03706d2e99c88e46241c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/149041
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28886}
diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index ca33b12..bb39f1f 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -19,6 +19,8 @@
"bitrate_prober.h",
"paced_sender.cc",
"paced_sender.h",
+ "pacing_controller.cc",
+ "pacing_controller.h",
"packet_router.cc",
"packet_router.h",
"round_robin_packet_queue.cc",
@@ -75,11 +77,13 @@
"bitrate_prober_unittest.cc",
"interval_budget_unittest.cc",
"paced_sender_unittest.cc",
+ "pacing_controller_unittest.cc",
"packet_router_unittest.cc",
]
deps = [
":interval_budget",
":pacing",
+ "../../api/units:data_rate",
"../../api/units:time_delta",
"../../rtc_base:checks",
"../../rtc_base:rtc_base_approved",
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index 25a15a1..665b070 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -16,8 +16,6 @@
#include "absl/memory/memory.h"
#include "api/rtc_event_log/rtc_event_log.h"
-#include "modules/pacing/bitrate_prober.h"
-#include "modules/pacing/interval_budget.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
@@ -25,50 +23,6 @@
#include "system_wrappers/include/clock.h"
namespace webrtc {
-namespace {
-// Time limit in milliseconds between packet bursts.
-constexpr TimeDelta kDefaultMinPacketLimit = TimeDelta::Millis<5>();
-constexpr TimeDelta kCongestedPacketInterval = TimeDelta::Millis<500>();
-constexpr TimeDelta kPausedProcessInterval = kCongestedPacketInterval;
-constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds<2>();
-
-// Upper cap on process interval, in case process has not been called in a long
-// time.
-constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis<30>();
-
-bool IsDisabled(const WebRtcKeyValueConfig& field_trials,
- absl::string_view key) {
- return field_trials.Lookup(key).find("Disabled") == 0;
-}
-
-bool IsEnabled(const WebRtcKeyValueConfig& field_trials,
- absl::string_view key) {
- return field_trials.Lookup(key).find("Enabled") == 0;
-}
-
-int GetPriorityForType(RtpPacketToSend::Type type) {
- switch (type) {
- case RtpPacketToSend::Type::kAudio:
- // Audio is always prioritized over other packet types.
- return 0;
- case RtpPacketToSend::Type::kRetransmission:
- // Send retransmissions before new media.
- return 1;
- case RtpPacketToSend::Type::kVideo:
- // Video has "normal" priority, in the old speak.
- return 2;
- case RtpPacketToSend::Type::kForwardErrorCorrection:
- // Send redundancy concurrently to video. If it is delayed it might have a
- // lower chance of being useful.
- return 2;
- case RtpPacketToSend::Type::kPadding:
- // Packets that are in themselves likely useless, only sent to keep the
- // BWE high.
- return 3;
- }
-}
-
-} // namespace
const int64_t PacedSender::kMaxQueueLengthMs = 2000;
const float PacedSender::kDefaultPaceMultiplier = 2.5f;
@@ -76,60 +30,24 @@
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials)
- : clock_(clock),
+ : pacing_controller_(clock,
+ static_cast<PacingController::PacketSender*>(this),
+ event_log,
+ field_trials),
packet_router_(packet_router),
- fallback_field_trials_(
- !field_trials ? absl::make_unique<FieldTrialBasedConfig>() : nullptr),
- field_trials_(field_trials ? field_trials : fallback_field_trials_.get()),
- drain_large_queues_(
- !IsDisabled(*field_trials_, "WebRTC-Pacer-DrainQueue")),
- send_padding_if_silent_(
- IsEnabled(*field_trials_, "WebRTC-Pacer-PadInSilence")),
- pace_audio_(!IsDisabled(*field_trials_, "WebRTC-Pacer-BlockAudio")),
- min_packet_limit_(kDefaultMinPacketLimit),
- last_timestamp_(clock_->CurrentTime()),
- paused_(false),
- media_budget_(0),
- padding_budget_(0),
- prober_(*field_trials_),
- probing_send_failure_(false),
- pacing_bitrate_(DataRate::Zero()),
- time_last_process_(clock->CurrentTime()),
- last_send_time_(time_last_process_),
- packets_(time_last_process_, field_trials),
- packet_counter_(0),
- congestion_window_size_(DataSize::PlusInfinity()),
- outstanding_data_(DataSize::Zero()),
- process_thread_(nullptr),
- queue_time_limit(TimeDelta::ms(kMaxQueueLengthMs)),
- account_for_audio_(false),
- legacy_packet_referencing_(
- IsEnabled(*field_trials_, "WebRTC-Pacer-LegacyPacketReferencing")) {
- if (!drain_large_queues_) {
- RTC_LOG(LS_WARNING) << "Pacer queues will not be drained,"
- "pushback experiment must be enabled.";
- }
- FieldTrialParameter<int> min_packet_limit_ms("", min_packet_limit_.ms());
- ParseFieldTrial({&min_packet_limit_ms},
- field_trials_->Lookup("WebRTC-Pacer-MinPacketLimitMs"));
- min_packet_limit_ = TimeDelta::ms(min_packet_limit_ms.Get());
- UpdateBudgetWithElapsedTime(min_packet_limit_);
-}
+ process_thread_(nullptr) {}
-PacedSender::~PacedSender() {}
+PacedSender::~PacedSender() = default;
void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) {
rtc::CritScope cs(&critsect_);
- prober_.CreateProbeCluster(bitrate.bps(), CurrentTime().ms(), cluster_id);
+ return pacing_controller_.CreateProbeCluster(bitrate, cluster_id);
}
void PacedSender::Pause() {
{
rtc::CritScope cs(&critsect_);
- if (!paused_)
- RTC_LOG(LS_INFO) << "PacedSender paused.";
- paused_ = true;
- packets_.SetPauseState(true, CurrentTime());
+ pacing_controller_.Pause();
}
rtc::CritScope cs(&process_thread_lock_);
// Tell the process thread to call our TimeUntilNextProcess() method to get
@@ -141,10 +59,7 @@
void PacedSender::Resume() {
{
rtc::CritScope cs(&critsect_);
- if (paused_)
- RTC_LOG(LS_INFO) << "PacedSender resumed.";
- paused_ = false;
- packets_.SetPauseState(false, CurrentTime());
+ pacing_controller_.Resume();
}
rtc::CritScope cs(&process_thread_lock_);
// Tell the process thread to call our TimeUntilNextProcess() method to
@@ -155,49 +70,22 @@
void PacedSender::SetCongestionWindow(DataSize congestion_window_size) {
rtc::CritScope cs(&critsect_);
- congestion_window_size_ = congestion_window_size;
+ pacing_controller_.SetCongestionWindow(congestion_window_size);
}
void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
rtc::CritScope cs(&critsect_);
- outstanding_data_ = outstanding_data;
-}
-
-bool PacedSender::Congested() const {
- if (congestion_window_size_.IsFinite()) {
- return outstanding_data_ >= congestion_window_size_;
- }
- return false;
-}
-
-Timestamp PacedSender::CurrentTime() const {
- Timestamp time = clock_->CurrentTime();
- if (time < last_timestamp_) {
- RTC_LOG(LS_WARNING)
- << "Non-monotonic clock behavior observed. Previous timestamp: "
- << last_timestamp_.ms() << ", new timestamp: " << time.ms();
- RTC_DCHECK_GE(time, last_timestamp_);
- time = last_timestamp_;
- }
- last_timestamp_ = time;
- return time;
+ pacing_controller_.UpdateOutstandingData(outstanding_data);
}
void PacedSender::SetProbingEnabled(bool enabled) {
rtc::CritScope cs(&critsect_);
- RTC_CHECK_EQ(0, packet_counter_);
- prober_.SetEnabled(enabled);
+ pacing_controller_.SetProbingEnabled(enabled);
}
void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) {
rtc::CritScope cs(&critsect_);
- RTC_DCHECK_GT(pacing_rate, DataRate::Zero());
- pacing_bitrate_ = pacing_rate;
- padding_budget_.set_target_rate_kbps(padding_rate.kbps());
-
- RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps="
- << pacing_bitrate_.kbps()
- << " padding_budget_kbps=" << padding_rate.kbps();
+ pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
}
void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
@@ -207,288 +95,69 @@
size_t bytes,
bool retransmission) {
rtc::CritScope cs(&critsect_);
- RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
- << "SetPacingRate must be called before InsertPacket.";
-
- Timestamp now = CurrentTime();
- prober_.OnIncomingPacket(bytes);
-
- if (capture_time_ms < 0)
- capture_time_ms = now.ms();
-
- RtpPacketToSend::Type type;
- switch (priority) {
- case RtpPacketSender::kHighPriority:
- type = RtpPacketToSend::Type::kAudio;
- break;
- case RtpPacketSender::kNormalPriority:
- type = RtpPacketToSend::Type::kRetransmission;
- break;
- default:
- type = RtpPacketToSend::Type::kVideo;
- }
- packets_.Push(GetPriorityForType(type), type, ssrc, sequence_number,
- capture_time_ms, now, DataSize::bytes(bytes), retransmission,
- packet_counter_++);
+ pacing_controller_.InsertPacket(priority, ssrc, sequence_number,
+ capture_time_ms, bytes, retransmission);
}
void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
rtc::CritScope cs(&critsect_);
- RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
- << "SetPacingRate must be called before InsertPacket.";
-
- Timestamp now = CurrentTime();
- prober_.OnIncomingPacket(packet->payload_size());
-
- if (packet->capture_time_ms() < 0) {
- packet->set_capture_time_ms(now.ms());
- }
-
- RTC_CHECK(packet->packet_type());
- int priority = GetPriorityForType(*packet->packet_type());
- packets_.Push(priority, now, packet_counter_++, std::move(packet));
+ pacing_controller_.EnqueuePacket(std::move(packet));
}
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
rtc::CritScope cs(&critsect_);
- account_for_audio_ = account_for_audio;
+ pacing_controller_.SetAccountForAudioPackets(account_for_audio);
}
TimeDelta PacedSender::ExpectedQueueTime() const {
rtc::CritScope cs(&critsect_);
- RTC_DCHECK_GT(pacing_bitrate_, DataRate::Zero());
- return TimeDelta::ms(
- (QueueSizeData().bytes() * 8 * rtc::kNumMillisecsPerSec) /
- pacing_bitrate_.bps());
+ return pacing_controller_.ExpectedQueueTime();
}
size_t PacedSender::QueueSizePackets() const {
rtc::CritScope cs(&critsect_);
- return packets_.SizeInPackets();
+ return pacing_controller_.QueueSizePackets();
}
DataSize PacedSender::QueueSizeData() const {
rtc::CritScope cs(&critsect_);
- return packets_.Size();
+ return pacing_controller_.QueueSizeData();
}
absl::optional<Timestamp> PacedSender::FirstSentPacketTime() const {
rtc::CritScope cs(&critsect_);
- return first_sent_packet_time_;
+ return pacing_controller_.FirstSentPacketTime();
}
TimeDelta PacedSender::OldestPacketWaitTime() const {
rtc::CritScope cs(&critsect_);
- Timestamp oldest_packet = packets_.OldestEnqueueTime();
- if (oldest_packet.IsInfinite()) {
- return TimeDelta::Zero();
- }
-
- return CurrentTime() - oldest_packet;
+ return pacing_controller_.OldestPacketWaitTime();
}
int64_t PacedSender::TimeUntilNextProcess() {
rtc::CritScope cs(&critsect_);
- TimeDelta elapsed_time = CurrentTime() - time_last_process_;
+
// When paused we wake up every 500 ms to send a padding packet to ensure
// we won't get stuck in the paused state due to no feedback being received.
- if (paused_) {
- return std::max(kPausedProcessInterval - elapsed_time, TimeDelta::Zero())
+ TimeDelta elapsed_time = pacing_controller_.TimeElapsedSinceLastProcess();
+ if (pacing_controller_.IsPaused()) {
+ return std::max(PacingController::kPausedProcessInterval - elapsed_time,
+ TimeDelta::Zero())
.ms();
}
- if (prober_.IsProbing()) {
- int64_t ret = prober_.TimeUntilNextProbe(CurrentTime().ms());
- if (ret > 0 || (ret == 0 && !probing_send_failure_))
- return ret;
+ auto next_probe = pacing_controller_.TimeUntilNextProbe();
+ if (next_probe) {
+ return next_probe->ms();
}
- return std::max(min_packet_limit_ - elapsed_time, TimeDelta::Zero()).ms();
-}
-TimeDelta PacedSender::UpdateTimeAndGetElapsed(Timestamp now) {
- TimeDelta elapsed_time = now - time_last_process_;
- time_last_process_ = now;
- if (elapsed_time > kMaxElapsedTime) {
- RTC_LOG(LS_WARNING) << "Elapsed time (" << elapsed_time.ms()
- << " ms) longer than expected, limiting to "
- << kMaxElapsedTime.ms();
- elapsed_time = kMaxElapsedTime;
- }
- return elapsed_time;
-}
-
-bool PacedSender::ShouldSendKeepalive(Timestamp now) const {
- if (send_padding_if_silent_ || paused_ || Congested()) {
- // We send a padding packet every 500 ms to ensure we won't get stuck in
- // congested state due to no feedback being received.
- TimeDelta elapsed_since_last_send = now - last_send_time_;
- if (elapsed_since_last_send >= kCongestedPacketInterval) {
- // We can not send padding unless a normal packet has first been sent. If
- // we do, timestamps get messed up.
- if (packet_counter_ > 0) {
- return true;
- }
- }
- }
- return false;
+ const TimeDelta min_packet_limit = TimeDelta::ms(5);
+ return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero()).ms();
}
void PacedSender::Process() {
rtc::CritScope cs(&critsect_);
- Timestamp now = CurrentTime();
- TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
- if (ShouldSendKeepalive(now)) {
- if (legacy_packet_referencing_) {
- critsect_.Leave();
- size_t bytes_sent =
- packet_router_->TimeToSendPadding(1, PacedPacketInfo());
- critsect_.Enter();
- OnPaddingSent(DataSize::bytes(bytes_sent));
- } else {
- DataSize keepalive_data_sent = DataSize::Zero();
- critsect_.Leave();
- std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
- packet_router_->GeneratePadding(1);
- for (auto& packet : keepalive_packets) {
- keepalive_data_sent +=
- DataSize::bytes(packet->payload_size() + packet->padding_size());
- packet_router_->SendPacket(std::move(packet), PacedPacketInfo());
- }
- critsect_.Enter();
- OnPaddingSent(keepalive_data_sent);
- }
- }
-
- if (paused_)
- return;
-
- if (elapsed_time > TimeDelta::Zero()) {
- DataRate target_rate = pacing_bitrate_;
- DataSize queue_size_data = packets_.Size();
- if (queue_size_data > DataSize::Zero()) {
- // Assuming equal size packets and input/output rate, the average packet
- // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
- // time constraint shall be met. Determine bitrate needed for that.
- packets_.UpdateQueueTime(CurrentTime());
- if (drain_large_queues_) {
- TimeDelta avg_time_left = std::max(
- TimeDelta::ms(1), queue_time_limit - packets_.AverageQueueTime());
- DataRate min_rate_needed = queue_size_data / avg_time_left;
- if (min_rate_needed > target_rate) {
- target_rate = min_rate_needed;
- RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
- << target_rate.kbps();
- }
- }
- }
-
- media_budget_.set_target_rate_kbps(target_rate.kbps());
- UpdateBudgetWithElapsedTime(elapsed_time);
- }
-
- bool is_probing = prober_.IsProbing();
- PacedPacketInfo pacing_info;
- absl::optional<DataSize> recommended_probe_size;
- if (is_probing) {
- pacing_info = prober_.CurrentCluster();
- recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());
- }
-
- DataSize data_sent = DataSize::Zero();
- // The paused state is checked in the loop since it leaves the critical
- // section allowing the paused state to be changed from other code.
- while (!paused_) {
- auto* packet = GetPendingPacket(pacing_info);
- if (packet == nullptr) {
- // No packet available to send, check if we should send padding.
- if (!legacy_packet_referencing_) {
- DataSize padding_to_add =
- PaddingToAdd(recommended_probe_size, data_sent);
- if (padding_to_add > DataSize::Zero()) {
- critsect_.Leave();
- std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
- packet_router_->GeneratePadding(padding_to_add.bytes());
- critsect_.Enter();
- if (padding_packets.empty()) {
- // No padding packets were generated, quite send loop.
- break;
- }
- for (auto& packet : padding_packets) {
- EnqueuePacket(std::move(packet));
- }
- // Continue loop to send the padding that was just added.
- continue;
- }
- }
-
- // Can't fetch new packet and no padding to send, exit send loop.
- break;
- }
-
- std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
- const bool owned_rtp_packet = rtp_packet != nullptr;
- RtpPacketSendResult success;
-
- if (rtp_packet != nullptr) {
- critsect_.Leave();
- packet_router_->SendPacket(std::move(rtp_packet), pacing_info);
- critsect_.Enter();
- success = RtpPacketSendResult::kSuccess;
- } else {
- critsect_.Leave();
- success = packet_router_->TimeToSendPacket(
- packet->ssrc(), packet->sequence_number(), packet->capture_time_ms(),
- packet->is_retransmission(), pacing_info);
- critsect_.Enter();
- }
-
- if (success == RtpPacketSendResult::kSuccess ||
- success == RtpPacketSendResult::kPacketNotFound) {
- // Packet sent or invalid packet, remove it from queue.
- // TODO(webrtc:8052): Don't consume media budget on kInvalid.
- data_sent += packet->size();
- // Send succeeded, remove it from the queue.
- OnPacketSent(packet);
- if (recommended_probe_size && data_sent > *recommended_probe_size)
- break;
- } else if (owned_rtp_packet) {
- // Send failed, but we can't put it back in the queue, remove it without
- // consuming budget.
- packets_.FinalizePop();
- break;
- } else {
- // Send failed, put it back into the queue.
- packets_.CancelPop();
- break;
- }
- }
-
- if (legacy_packet_referencing_ && packets_.Empty() && !Congested()) {
- // We can not send padding unless a normal packet has first been sent. If we
- // do, timestamps get messed up.
- if (packet_counter_ > 0) {
- DataSize padding_needed =
- (recommended_probe_size && *recommended_probe_size > data_sent)
- ? (*recommended_probe_size - data_sent)
- : DataSize::bytes(padding_budget_.bytes_remaining());
- if (padding_needed > DataSize::Zero()) {
- DataSize padding_sent = DataSize::Zero();
- critsect_.Leave();
- padding_sent = DataSize::bytes(packet_router_->TimeToSendPadding(
- padding_needed.bytes(), pacing_info));
- critsect_.Enter();
- data_sent += padding_sent;
- OnPaddingSent(padding_sent);
- }
- }
- }
-
- if (is_probing) {
- probing_send_failure_ = data_sent == DataSize::Zero();
- if (!probing_send_failure_) {
- prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
- }
- }
+ pacing_controller_.ProcessPackets();
}
void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
@@ -497,93 +166,49 @@
process_thread_ = process_thread;
}
-DataSize PacedSender::PaddingToAdd(
- absl::optional<DataSize> recommended_probe_size,
- DataSize data_sent) {
- if (!packets_.Empty()) {
- // Actual payload available, no need to add padding.
- return DataSize::Zero();
- }
-
- if (Congested()) {
- // Don't add padding if congested, even if requested for probing.
- return DataSize::Zero();
- }
-
- if (packet_counter_ == 0) {
- // We can not send padding unless a normal packet has first been sent. If we
- // do, timestamps get messed up.
- return DataSize::Zero();
- }
-
- if (recommended_probe_size) {
- if (*recommended_probe_size > data_sent) {
- return *recommended_probe_size - data_sent;
- }
- return DataSize::Zero();
- }
-
- return DataSize::bytes(padding_budget_.bytes_remaining());
-}
-
-RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket(
- const PacedPacketInfo& pacing_info) {
- if (packets_.Empty()) {
- return nullptr;
- }
-
- // Since we need to release the lock in order to send, we first pop the
- // element from the priority queue but keep it in storage, so that we can
- // reinsert it if send fails.
- RoundRobinPacketQueue::QueuedPacket* packet = packets_.BeginPop();
- bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
- bool apply_pacing = !audio_packet || pace_audio_;
- if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
- pacing_info.probe_cluster_id ==
- PacedPacketInfo::kNotAProbe))) {
- packets_.CancelPop();
- return nullptr;
- }
- return packet;
-}
-
-void PacedSender::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) {
- Timestamp now = CurrentTime();
- if (!first_sent_packet_time_) {
- first_sent_packet_time_ = now;
- }
- bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
- if (!audio_packet || account_for_audio_) {
- // Update media bytes sent.
- UpdateBudgetWithSentData(packet->size());
- last_send_time_ = now;
- }
- // Send succeeded, remove it from the queue.
- packets_.FinalizePop();
-}
-
-void PacedSender::OnPaddingSent(DataSize data_sent) {
- if (data_sent > DataSize::Zero()) {
- UpdateBudgetWithSentData(data_sent);
- }
- last_send_time_ = CurrentTime();
-}
-
-void PacedSender::UpdateBudgetWithElapsedTime(TimeDelta delta) {
- delta = std::min(kMaxProcessingInterval, delta);
- media_budget_.IncreaseBudget(delta.ms());
- padding_budget_.IncreaseBudget(delta.ms());
-}
-
-void PacedSender::UpdateBudgetWithSentData(DataSize size) {
- outstanding_data_ += size;
- media_budget_.UseBudget(size.bytes());
- padding_budget_.UseBudget(size.bytes());
-}
-
void PacedSender::SetQueueTimeLimit(TimeDelta limit) {
rtc::CritScope cs(&critsect_);
- queue_time_limit = limit;
+ pacing_controller_.SetQueueTimeLimit(limit);
+}
+
+void PacedSender::SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) {
+ critsect_.Leave();
+ packet_router_->SendPacket(std::move(packet), cluster_info);
+ critsect_.Enter();
+}
+
+std::vector<std::unique_ptr<RtpPacketToSend>> PacedSender::GeneratePadding(
+ DataSize size) {
+ std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets;
+ critsect_.Leave();
+ padding_packets = packet_router_->GeneratePadding(size.bytes());
+ critsect_.Enter();
+ return padding_packets;
+}
+
+RtpPacketSendResult PacedSender::TimeToSendPacket(
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_timestamp,
+ bool retransmission,
+ const PacedPacketInfo& packet_info) {
+ RtpPacketSendResult result;
+ critsect_.Leave();
+ result = packet_router_->TimeToSendPacket(
+ ssrc, sequence_number, capture_timestamp, retransmission, packet_info);
+ critsect_.Enter();
+ return result;
+}
+
+DataSize PacedSender::TimeToSendPadding(DataSize size,
+ const PacedPacketInfo& pacing_info) {
+ size_t padding_bytes_sent;
+ critsect_.Leave();
+ padding_bytes_sent =
+ packet_router_->TimeToSendPadding(size.bytes(), pacing_info);
+ critsect_.Enter();
+ return DataSize::bytes(padding_bytes_sent);
}
} // namespace webrtc
diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h
index 07c249f..71e826d 100644
--- a/modules/pacing/paced_sender.h
+++ b/modules/pacing/paced_sender.h
@@ -16,6 +16,7 @@
#include <atomic>
#include <memory>
+#include <vector>
#include "absl/types/optional.h"
#include "api/function_view.h"
@@ -25,14 +26,13 @@
#include "modules/include/module.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
+#include "modules/pacing/pacing_controller.h"
#include "modules/pacing/packet_router.h"
-#include "modules/pacing/round_robin_packet_queue.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/include/rtp_packet_sender.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/critical_section.h"
-#include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
@@ -41,7 +41,8 @@
class PacedSender : public Module,
public RtpPacketPacer,
- public RtpPacketSender {
+ public RtpPacketSender,
+ private PacingController::PacketSender {
public:
// Expected max pacer delay in ms. If ExpectedQueueTime() is higher than
// this value, the packet producers should wait (eg drop frames rather than
@@ -116,6 +117,7 @@
// Below are methods specific to this implementation, such as things related
// to module processing thread specifics or methods exposed for test.
+ // TODO(bugs.webrtc.org/10809): Remove when cleanup up unit tests.
// Enable bitrate probing. Enabled by default, mostly here to simplify
// testing. Must be called before any packets are being sent to have an
// effect.
@@ -134,69 +136,30 @@
void ProcessThreadAttached(ProcessThread* process_thread) override;
private:
- TimeDelta UpdateTimeAndGetElapsed(Timestamp now)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- bool ShouldSendKeepalive(Timestamp now) const
+ // Methods implementing PacedSenderController:PacketSender.
+
+ void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) override
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- // Updates the number of bytes that can be sent for the next time interval.
- void UpdateBudgetWithElapsedTime(TimeDelta delta)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- void UpdateBudgetWithSentData(DataSize size)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
+ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
+ DataSize size) override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- DataSize PaddingToAdd(absl::optional<DataSize> recommended_probe_size,
- DataSize data_sent)
+ // TODO(bugs.webrtc.org/10633): Remove these when old code path is gone.
+ RtpPacketSendResult TimeToSendPacket(uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_timestamp,
+ bool retransmission,
+ const PacedPacketInfo& packet_info)
+ override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
+ DataSize TimeToSendPadding(DataSize size,
+ const PacedPacketInfo& pacing_info) override
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- RoundRobinPacketQueue::QueuedPacket* GetPendingPacket(
- const PacedPacketInfo& pacing_info)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- void OnPaddingSent(DataSize padding_sent)
- RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
-
- bool Congested() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- Timestamp CurrentTime() const RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
-
- Clock* const clock_;
- PacketRouter* const packet_router_;
- const std::unique_ptr<FieldTrialBasedConfig> fallback_field_trials_;
- const WebRtcKeyValueConfig* field_trials_;
-
- const bool drain_large_queues_;
- const bool send_padding_if_silent_;
- const bool pace_audio_;
- TimeDelta min_packet_limit_;
-
rtc::CriticalSection critsect_;
- // TODO(webrtc:9716): Remove this when we are certain clocks are monotonic.
- // The last millisecond timestamp returned by |clock_|.
- mutable Timestamp last_timestamp_ RTC_GUARDED_BY(critsect_);
- bool paused_ RTC_GUARDED_BY(critsect_);
- // This is the media budget, keeping track of how many bits of media
- // we can pace out during the current interval.
- IntervalBudget media_budget_ RTC_GUARDED_BY(critsect_);
- // This is the padding budget, keeping track of how many bits of padding we're
- // allowed to send out during the current interval. This budget will be
- // utilized when there's no media to send.
- IntervalBudget padding_budget_ RTC_GUARDED_BY(critsect_);
+ PacingController pacing_controller_ RTC_GUARDED_BY(critsect_);
- BitrateProber prober_ RTC_GUARDED_BY(critsect_);
- bool probing_send_failure_ RTC_GUARDED_BY(critsect_);
-
- DataRate pacing_bitrate_ RTC_GUARDED_BY(critsect_);
-
- Timestamp time_last_process_ RTC_GUARDED_BY(critsect_);
- Timestamp last_send_time_ RTC_GUARDED_BY(critsect_);
- absl::optional<Timestamp> first_sent_packet_time_ RTC_GUARDED_BY(critsect_);
-
- RoundRobinPacketQueue packets_ RTC_GUARDED_BY(critsect_);
- uint64_t packet_counter_ RTC_GUARDED_BY(critsect_);
-
- DataSize congestion_window_size_ RTC_GUARDED_BY(critsect_);
- DataSize outstanding_data_ RTC_GUARDED_BY(critsect_);
+ PacketRouter* const packet_router_;
// Lock to avoid race when attaching process thread. This can happen due to
// the Call class setting network state on RtpTransportControllerSend, which
@@ -205,14 +168,6 @@
// queue separate from the thread used by Call, this causes a race.
rtc::CriticalSection process_thread_lock_;
ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_);
-
- TimeDelta queue_time_limit RTC_GUARDED_BY(critsect_);
- bool account_for_audio_ RTC_GUARDED_BY(critsect_);
-
- // If true, PacedSender should only reference packets as in legacy mode.
- // If false, PacedSender may have direct ownership of RtpPacketToSend objects.
- // Defaults to true, will be changed to default false soon.
- const bool legacy_packet_referencing_;
};
} // namespace webrtc
#endif // MODULES_PACING_PACED_SENDER_H_
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
new file mode 100644
index 0000000..233a3fa
--- /dev/null
+++ b/modules/pacing/pacing_controller.cc
@@ -0,0 +1,552 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "modules/pacing/pacing_controller.h"
+
+#include <algorithm>
+#include <utility>
+#include <vector>
+
+#include "absl/memory/memory.h"
+#include "modules/pacing/bitrate_prober.h"
+#include "modules/pacing/interval_budget.h"
+#include "modules/utility/include/process_thread.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/time_utils.h"
+#include "system_wrappers/include/clock.h"
+
+namespace webrtc {
+namespace {
+// Time limit in milliseconds between packet bursts.
+constexpr TimeDelta kDefaultMinPacketLimit = TimeDelta::Millis<5>();
+constexpr TimeDelta kCongestedPacketInterval = TimeDelta::Millis<500>();
+constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds<2>();
+
+// Upper cap on process interval, in case process has not been called in a long
+// time.
+constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis<30>();
+
+bool IsDisabled(const WebRtcKeyValueConfig& field_trials,
+ absl::string_view key) {
+ return field_trials.Lookup(key).find("Disabled") == 0;
+}
+
+bool IsEnabled(const WebRtcKeyValueConfig& field_trials,
+ absl::string_view key) {
+ return field_trials.Lookup(key).find("Enabled") == 0;
+}
+
+int GetPriorityForType(RtpPacketToSend::Type type) {
+ switch (type) {
+ case RtpPacketToSend::Type::kAudio:
+ // Audio is always prioritized over other packet types.
+ return 0;
+ case RtpPacketToSend::Type::kRetransmission:
+ // Send retransmissions before new media.
+ return 1;
+ case RtpPacketToSend::Type::kVideo:
+ // Video has "normal" priority, in the old speak.
+ return 2;
+ case RtpPacketToSend::Type::kForwardErrorCorrection:
+ // Send redundancy concurrently to video. If it is delayed it might have a
+ // lower chance of being useful.
+ return 2;
+ case RtpPacketToSend::Type::kPadding:
+ // Packets that are in themselves likely useless, only sent to keep the
+ // BWE high.
+ return 3;
+ }
+}
+
+} // namespace
+
+const TimeDelta PacingController::kMaxExpectedQueueLength =
+ TimeDelta::Millis<2000>();
+const float PacingController::kDefaultPaceMultiplier = 2.5f;
+const TimeDelta PacingController::kPausedProcessInterval =
+ kCongestedPacketInterval;
+
+PacingController::PacingController(Clock* clock,
+ PacketSender* packet_sender,
+ RtcEventLog* event_log,
+ const WebRtcKeyValueConfig* field_trials)
+ : clock_(clock),
+ packet_sender_(packet_sender),
+ fallback_field_trials_(
+ !field_trials ? absl::make_unique<FieldTrialBasedConfig>() : nullptr),
+ field_trials_(field_trials ? field_trials : fallback_field_trials_.get()),
+ drain_large_queues_(
+ !IsDisabled(*field_trials_, "WebRTC-Pacer-DrainQueue")),
+ send_padding_if_silent_(
+ IsEnabled(*field_trials_, "WebRTC-Pacer-PadInSilence")),
+ pace_audio_(!IsDisabled(*field_trials_, "WebRTC-Pacer-BlockAudio")),
+ min_packet_limit_(kDefaultMinPacketLimit),
+ last_timestamp_(clock_->CurrentTime()),
+ paused_(false),
+ media_budget_(0),
+ padding_budget_(0),
+ prober_(*field_trials_),
+ probing_send_failure_(false),
+ padding_failure_state_(false),
+ pacing_bitrate_(DataRate::Zero()),
+ time_last_process_(clock->CurrentTime()),
+ last_send_time_(time_last_process_),
+ packet_queue_(time_last_process_, field_trials),
+ packet_counter_(0),
+ congestion_window_size_(DataSize::PlusInfinity()),
+ outstanding_data_(DataSize::Zero()),
+ queue_time_limit(kMaxExpectedQueueLength),
+ account_for_audio_(false),
+ legacy_packet_referencing_(
+ IsEnabled(*field_trials_, "WebRTC-Pacer-LegacyPacketReferencing")) {
+ if (!drain_large_queues_) {
+ RTC_LOG(LS_WARNING) << "Pacer queues will not be drained,"
+ "pushback experiment must be enabled.";
+ }
+ FieldTrialParameter<int> min_packet_limit_ms("", min_packet_limit_.ms());
+ ParseFieldTrial({&min_packet_limit_ms},
+ field_trials_->Lookup("WebRTC-Pacer-MinPacketLimitMs"));
+ min_packet_limit_ = TimeDelta::ms(min_packet_limit_ms.Get());
+ UpdateBudgetWithElapsedTime(min_packet_limit_);
+}
+
+PacingController::~PacingController() = default;
+
+void PacingController::CreateProbeCluster(DataRate bitrate, int cluster_id) {
+ prober_.CreateProbeCluster(bitrate.bps(), CurrentTime().ms(), cluster_id);
+}
+
+void PacingController::Pause() {
+ if (!paused_)
+ RTC_LOG(LS_INFO) << "PacedSender paused.";
+ paused_ = true;
+ packet_queue_.SetPauseState(true, CurrentTime());
+}
+
+void PacingController::Resume() {
+ if (paused_)
+ RTC_LOG(LS_INFO) << "PacedSender resumed.";
+ paused_ = false;
+ packet_queue_.SetPauseState(false, CurrentTime());
+}
+
+bool PacingController::IsPaused() const {
+ return paused_;
+}
+
+void PacingController::SetCongestionWindow(DataSize congestion_window_size) {
+ congestion_window_size_ = congestion_window_size;
+}
+
+void PacingController::UpdateOutstandingData(DataSize outstanding_data) {
+ outstanding_data_ = outstanding_data;
+}
+
+bool PacingController::Congested() const {
+ if (congestion_window_size_.IsFinite()) {
+ return outstanding_data_ >= congestion_window_size_;
+ }
+ return false;
+}
+
+Timestamp PacingController::CurrentTime() const {
+ Timestamp time = clock_->CurrentTime();
+ if (time < last_timestamp_) {
+ RTC_LOG(LS_WARNING)
+ << "Non-monotonic clock behavior observed. Previous timestamp: "
+ << last_timestamp_.ms() << ", new timestamp: " << time.ms();
+ RTC_DCHECK_GE(time, last_timestamp_);
+ time = last_timestamp_;
+ }
+ last_timestamp_ = time;
+ return time;
+}
+
+void PacingController::SetProbingEnabled(bool enabled) {
+ RTC_CHECK_EQ(0, packet_counter_);
+ prober_.SetEnabled(enabled);
+}
+
+void PacingController::SetPacingRates(DataRate pacing_rate,
+ DataRate padding_rate) {
+ RTC_DCHECK_GT(pacing_rate, DataRate::Zero());
+ pacing_bitrate_ = pacing_rate;
+ padding_budget_.set_target_rate_kbps(padding_rate.kbps());
+
+ RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps="
+ << pacing_bitrate_.kbps()
+ << " padding_budget_kbps=" << padding_rate.kbps();
+}
+
+void PacingController::InsertPacket(RtpPacketSender::Priority priority,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ size_t bytes,
+ bool retransmission) {
+ RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
+ << "SetPacingRate must be called before InsertPacket.";
+
+ Timestamp now = CurrentTime();
+ prober_.OnIncomingPacket(bytes);
+
+ if (capture_time_ms < 0)
+ capture_time_ms = now.ms();
+
+ RtpPacketToSend::Type type;
+ switch (priority) {
+ case RtpPacketSender::kHighPriority:
+ type = RtpPacketToSend::Type::kAudio;
+ break;
+ case RtpPacketSender::kNormalPriority:
+ type = RtpPacketToSend::Type::kRetransmission;
+ break;
+ default:
+ type = RtpPacketToSend::Type::kVideo;
+ }
+ packet_queue_.Push(GetPriorityForType(type), type, ssrc, sequence_number,
+ capture_time_ms, now, DataSize::bytes(bytes),
+ retransmission, packet_counter_++);
+}
+
+void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
+ RTC_DCHECK(pacing_bitrate_ > DataRate::Zero())
+ << "SetPacingRate must be called before InsertPacket.";
+
+ Timestamp now = CurrentTime();
+ prober_.OnIncomingPacket(packet->payload_size());
+
+ if (packet->capture_time_ms() < 0) {
+ packet->set_capture_time_ms(now.ms());
+ }
+
+ RTC_CHECK(packet->packet_type());
+ int priority = GetPriorityForType(*packet->packet_type());
+ packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
+}
+
+void PacingController::SetAccountForAudioPackets(bool account_for_audio) {
+ account_for_audio_ = account_for_audio;
+}
+
+TimeDelta PacingController::ExpectedQueueTime() const {
+ RTC_DCHECK_GT(pacing_bitrate_, DataRate::Zero());
+ return TimeDelta::ms(
+ (QueueSizeData().bytes() * 8 * rtc::kNumMillisecsPerSec) /
+ pacing_bitrate_.bps());
+}
+
+size_t PacingController::QueueSizePackets() const {
+ return packet_queue_.SizeInPackets();
+}
+
+DataSize PacingController::QueueSizeData() const {
+ return packet_queue_.Size();
+}
+
+absl::optional<Timestamp> PacingController::FirstSentPacketTime() const {
+ return first_sent_packet_time_;
+}
+
+TimeDelta PacingController::OldestPacketWaitTime() const {
+ Timestamp oldest_packet = packet_queue_.OldestEnqueueTime();
+ if (oldest_packet.IsInfinite()) {
+ return TimeDelta::Zero();
+ }
+
+ return CurrentTime() - oldest_packet;
+}
+
+TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) {
+ TimeDelta elapsed_time = now - time_last_process_;
+ time_last_process_ = now;
+ if (elapsed_time > kMaxElapsedTime) {
+ RTC_LOG(LS_WARNING) << "Elapsed time (" << elapsed_time.ms()
+ << " ms) longer than expected, limiting to "
+ << kMaxElapsedTime.ms();
+ elapsed_time = kMaxElapsedTime;
+ }
+ return elapsed_time;
+}
+
+bool PacingController::ShouldSendKeepalive(Timestamp now) const {
+ if (send_padding_if_silent_ || paused_ || Congested()) {
+ // We send a padding packet every 500 ms to ensure we won't get stuck in
+ // congested state due to no feedback being received.
+ TimeDelta elapsed_since_last_send = now - last_send_time_;
+ if (elapsed_since_last_send >= kCongestedPacketInterval) {
+ // We can not send padding unless a normal packet has first been sent. If
+ // we do, timestamps get messed up.
+ if (packet_counter_ > 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+absl::optional<TimeDelta> PacingController::TimeUntilNextProbe() {
+ if (!prober_.IsProbing()) {
+ return absl::nullopt;
+ }
+
+ TimeDelta time_delta =
+ TimeDelta::ms(prober_.TimeUntilNextProbe(CurrentTime().ms()));
+ if (time_delta > TimeDelta::Zero() ||
+ (time_delta == TimeDelta::Zero() && !probing_send_failure_)) {
+ return time_delta;
+ }
+
+ return absl::nullopt;
+}
+
+TimeDelta PacingController::TimeElapsedSinceLastProcess() const {
+ return CurrentTime() - time_last_process_;
+}
+
+void PacingController::ProcessPackets() {
+ Timestamp now = CurrentTime();
+ TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
+ if (ShouldSendKeepalive(now)) {
+ if (legacy_packet_referencing_) {
+ OnPaddingSent(packet_sender_->TimeToSendPadding(DataSize::bytes(1),
+ PacedPacketInfo()));
+ } else {
+ DataSize keepalive_data_sent = DataSize::Zero();
+ std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
+ packet_sender_->GeneratePadding(DataSize::bytes(1));
+ for (auto& packet : keepalive_packets) {
+ keepalive_data_sent +=
+ DataSize::bytes(packet->payload_size() + packet->padding_size());
+ packet_sender_->SendRtpPacket(std::move(packet), PacedPacketInfo());
+ }
+ OnPaddingSent(keepalive_data_sent);
+ }
+ }
+
+ if (paused_)
+ return;
+
+ if (elapsed_time > TimeDelta::Zero()) {
+ DataRate target_rate = pacing_bitrate_;
+ DataSize queue_size_data = packet_queue_.Size();
+ if (queue_size_data > DataSize::Zero()) {
+ // Assuming equal size packets and input/output rate, the average packet
+ // has avg_time_left_ms left to get queue_size_bytes out of the queue, if
+ // time constraint shall be met. Determine bitrate needed for that.
+ packet_queue_.UpdateQueueTime(CurrentTime());
+ if (drain_large_queues_) {
+ TimeDelta avg_time_left =
+ std::max(TimeDelta::ms(1),
+ queue_time_limit - packet_queue_.AverageQueueTime());
+ DataRate min_rate_needed = queue_size_data / avg_time_left;
+ if (min_rate_needed > target_rate) {
+ target_rate = min_rate_needed;
+ RTC_LOG(LS_VERBOSE) << "bwe:large_pacing_queue pacing_rate_kbps="
+ << target_rate.kbps();
+ }
+ }
+ }
+
+ media_budget_.set_target_rate_kbps(target_rate.kbps());
+ UpdateBudgetWithElapsedTime(elapsed_time);
+ }
+
+ bool is_probing = prober_.IsProbing();
+ PacedPacketInfo pacing_info;
+ absl::optional<DataSize> recommended_probe_size;
+ if (is_probing) {
+ pacing_info = prober_.CurrentCluster();
+ recommended_probe_size = DataSize::bytes(prober_.RecommendedMinProbeSize());
+ }
+
+ DataSize data_sent = DataSize::Zero();
+ // The paused state is checked in the loop since it leaves the critical
+ // section allowing the paused state to be changed from other code.
+ while (!paused_) {
+ auto* packet = GetPendingPacket(pacing_info);
+ if (packet == nullptr) {
+ // No packet available to send, check if we should send padding.
+ if (!legacy_packet_referencing_) {
+ DataSize padding_to_add =
+ PaddingToAdd(recommended_probe_size, data_sent);
+ if (padding_to_add > DataSize::Zero()) {
+ std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
+ packet_sender_->GeneratePadding(padding_to_add);
+ if (padding_packets.empty()) {
+ // No padding packets were generated, quite send loop.
+ break;
+ }
+ for (auto& packet : padding_packets) {
+ EnqueuePacket(std::move(packet));
+ }
+ // Continue loop to send the padding that was just added.
+ continue;
+ }
+ }
+
+ // Can't fetch new packet and no padding to send, exit send loop.
+ break;
+ }
+
+ std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
+ const bool owned_rtp_packet = rtp_packet != nullptr;
+ RtpPacketSendResult success;
+
+ if (rtp_packet != nullptr) {
+ packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);
+ success = RtpPacketSendResult::kSuccess;
+ } else {
+ success = packet_sender_->TimeToSendPacket(
+ packet->ssrc(), packet->sequence_number(), packet->capture_time_ms(),
+ packet->is_retransmission(), pacing_info);
+ }
+
+ if (success == RtpPacketSendResult::kSuccess ||
+ success == RtpPacketSendResult::kPacketNotFound) {
+ // Packet sent or invalid packet, remove it from queue.
+ // TODO(webrtc:8052): Don't consume media budget on kInvalid.
+ data_sent += packet->size();
+ // Send succeeded, remove it from the queue.
+ OnPacketSent(packet);
+ if (recommended_probe_size && data_sent > *recommended_probe_size)
+ break;
+ } else if (owned_rtp_packet) {
+ // Send failed, but we can't put it back in the queue, remove it without
+ // consuming budget.
+ packet_queue_.FinalizePop();
+ break;
+ } else {
+ // Send failed, put it back into the queue.
+ packet_queue_.CancelPop();
+ break;
+ }
+ }
+
+ if (legacy_packet_referencing_ && packet_queue_.Empty() && !Congested()) {
+ // We can not send padding unless a normal packet has first been sent. If we
+ // do, timestamps get messed up.
+ if (packet_counter_ > 0) {
+ DataSize padding_needed =
+ (recommended_probe_size && *recommended_probe_size > data_sent)
+ ? (*recommended_probe_size - data_sent)
+ : DataSize::bytes(padding_budget_.bytes_remaining());
+ if (padding_needed > DataSize::Zero()) {
+ DataSize padding_sent = DataSize::Zero();
+ padding_sent =
+ packet_sender_->TimeToSendPadding(padding_needed, pacing_info);
+ data_sent += padding_sent;
+ OnPaddingSent(padding_sent);
+ }
+ }
+ }
+
+ if (is_probing) {
+ probing_send_failure_ = data_sent == DataSize::Zero();
+ if (!probing_send_failure_) {
+ prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
+ }
+ }
+}
+
+DataSize PacingController::PaddingToAdd(
+ absl::optional<DataSize> recommended_probe_size,
+ DataSize data_sent) {
+ if (!packet_queue_.Empty()) {
+ // Actual payload available, no need to add padding.
+ return DataSize::Zero();
+ }
+
+ if (Congested()) {
+ // Don't add padding if congested, even if requested for probing.
+ return DataSize::Zero();
+ }
+
+ if (packet_counter_ == 0) {
+ // We can not send padding unless a normal packet has first been sent. If we
+ // do, timestamps get messed up.
+ return DataSize::Zero();
+ }
+
+ if (recommended_probe_size) {
+ if (*recommended_probe_size > data_sent) {
+ return *recommended_probe_size - data_sent;
+ }
+ return DataSize::Zero();
+ }
+
+ return DataSize::bytes(padding_budget_.bytes_remaining());
+}
+
+RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(
+ const PacedPacketInfo& pacing_info) {
+ if (packet_queue_.Empty()) {
+ return nullptr;
+ }
+
+ // Since we need to release the lock in order to send, we first pop the
+ // element from the priority queue but keep it in storage, so that we can
+ // reinsert it if send fails.
+ RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();
+ bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
+ bool apply_pacing = !audio_packet || pace_audio_;
+ if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
+ pacing_info.probe_cluster_id ==
+ PacedPacketInfo::kNotAProbe))) {
+ packet_queue_.CancelPop();
+ return nullptr;
+ }
+ return packet;
+}
+
+void PacingController::OnPacketSent(
+ RoundRobinPacketQueue::QueuedPacket* packet) {
+ Timestamp now = CurrentTime();
+ if (!first_sent_packet_time_) {
+ first_sent_packet_time_ = now;
+ }
+ bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
+ if (!audio_packet || account_for_audio_) {
+ // Update media bytes sent.
+ UpdateBudgetWithSentData(packet->size());
+ last_send_time_ = now;
+ }
+ // Send succeeded, remove it from the queue.
+ packet_queue_.FinalizePop();
+ padding_failure_state_ = false;
+}
+
+void PacingController::OnPaddingSent(DataSize data_sent) {
+ if (data_sent > DataSize::Zero()) {
+ UpdateBudgetWithSentData(data_sent);
+ } else {
+ padding_failure_state_ = true;
+ }
+ last_send_time_ = CurrentTime();
+}
+
+void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
+ delta = std::min(kMaxProcessingInterval, delta);
+ media_budget_.IncreaseBudget(delta.ms());
+ padding_budget_.IncreaseBudget(delta.ms());
+}
+
+void PacingController::UpdateBudgetWithSentData(DataSize size) {
+ outstanding_data_ += size;
+ media_budget_.UseBudget(size.bytes());
+ padding_budget_.UseBudget(size.bytes());
+}
+
+void PacingController::SetQueueTimeLimit(TimeDelta limit) {
+ queue_time_limit = limit;
+}
+
+} // namespace webrtc
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
new file mode 100644
index 0000000..0948616
--- /dev/null
+++ b/modules/pacing/pacing_controller.h
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef MODULES_PACING_PACING_CONTROLLER_H_
+#define MODULES_PACING_PACING_CONTROLLER_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <atomic>
+#include <memory>
+#include <vector>
+
+#include "absl/types/optional.h"
+#include "api/function_view.h"
+#include "api/rtc_event_log/rtc_event_log.h"
+#include "api/transport/field_trial_based_config.h"
+#include "api/transport/network_types.h"
+#include "api/transport/webrtc_key_value_config.h"
+#include "modules/pacing/bitrate_prober.h"
+#include "modules/pacing/interval_budget.h"
+#include "modules/pacing/round_robin_packet_queue.h"
+#include "modules/pacing/rtp_packet_pacer.h"
+#include "modules/rtp_rtcp/include/rtp_packet_sender.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
+#include "rtc_base/critical_section.h"
+#include "rtc_base/experiments/field_trial_parser.h"
+#include "rtc_base/thread_annotations.h"
+
+namespace webrtc {
+
+// This class implements a leaky-buck packet pacing algorithm. It handles the
+// logic of determining which packets to send when, but the actual timing of
+// the processing is done externally (e.g. PacedSender). Furthermore, the
+// forwarding of packets when they are ready to be sent is also handled
+// externally, via the PacedSendingController::PacketSender interface.
+//
+class PacingController {
+ public:
+ class PacketSender {
+ public:
+ virtual ~PacketSender() = default;
+ virtual void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) = 0;
+ virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
+ DataSize size) = 0;
+
+ // TODO(bugs.webrtc.org/10633): Remove these when old code path is gone.
+ virtual RtpPacketSendResult TimeToSendPacket(
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_timestamp,
+ bool retransmission,
+ const PacedPacketInfo& packet_info) = 0;
+ virtual DataSize TimeToSendPadding(DataSize size,
+ const PacedPacketInfo& pacing_info) = 0;
+ };
+
+ // Expected max pacer delay. If ExpectedQueueTime() is higher than
+ // this value, the packet producers should wait (eg drop frames rather than
+ // encoding them). Bitrate sent may temporarily exceed target set by
+ // UpdateBitrate() so that this limit will be upheld.
+ static const TimeDelta kMaxExpectedQueueLength;
+ // Pacing-rate relative to our target send rate.
+ // Multiplicative factor that is applied to the target bitrate to calculate
+ // the number of bytes that can be transmitted per interval.
+ // Increasing this factor will result in lower delays in cases of bitrate
+ // overshoots from the encoder.
+ static const float kDefaultPaceMultiplier;
+ // If no media or paused, wake up at least every |kPausedProcessIntervalMs| in
+ // order to send a keep-alive packet so we don't get stuck in a bad state due
+ // to lack of feedback.
+ static const TimeDelta kPausedProcessInterval;
+
+ PacingController(Clock* clock,
+ PacketSender* packet_sender,
+ RtcEventLog* event_log,
+ const WebRtcKeyValueConfig* field_trials);
+
+ ~PacingController();
+
+ // Adds the packet information to the queue and calls TimeToSendPacket
+ // when it's time to send.
+ void InsertPacket(RtpPacketSender::Priority priority,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ size_t bytes,
+ bool retransmission);
+ // Adds the packet to the queue and calls PacketRouter::SendPacket() when
+ // it's time to send.
+ void EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet);
+
+ void CreateProbeCluster(DataRate bitrate, int cluster_id);
+
+ void Pause(); // Temporarily pause all sending.
+ void Resume(); // Resume sending packets.
+ bool IsPaused() const;
+
+ void SetCongestionWindow(DataSize congestion_window_size);
+ void UpdateOutstandingData(DataSize outstanding_data);
+
+ // Sets the pacing rates. Must be called once before packets can be sent.
+ void SetPacingRates(DataRate pacing_rate, DataRate padding_rate);
+
+ // Currently audio traffic is not accounted by pacer and passed through.
+ // With the introduction of audio BWE audio traffic will be accounted for
+ // the pacer budget calculation. The audio traffic still will be injected
+ // at high priority.
+ void SetAccountForAudioPackets(bool account_for_audio);
+
+ // Returns the time since the oldest queued packet was enqueued.
+ TimeDelta OldestPacketWaitTime() const;
+
+ size_t QueueSizePackets() const;
+ DataSize QueueSizeData() const;
+
+ // Returns the time when the first packet was sent;
+ absl::optional<Timestamp> FirstSentPacketTime() const;
+
+ // Returns the number of milliseconds it will take to send the current
+ // packets in the queue, given the current size and bitrate, ignoring prio.
+ TimeDelta ExpectedQueueTime() const;
+
+ void SetQueueTimeLimit(TimeDelta limit);
+
+ // Enable bitrate probing. Enabled by default, mostly here to simplify
+ // testing. Must be called before any packets are being sent to have an
+ // effect.
+ void SetProbingEnabled(bool enabled);
+
+ // Time until next probe should be sent. If this value is set, it should be
+ // respected - i.e. don't call ProcessPackets() before this specified time as
+ // that can have unintended side effects.
+ absl::optional<TimeDelta> TimeUntilNextProbe();
+
+ // Time since ProcessPackets() was last executed.
+ TimeDelta TimeElapsedSinceLastProcess() const;
+
+ TimeDelta TimeUntilAvailableBudget() const;
+
+ // Check queue of pending packets and send them or padding packets, if budget
+ // is available.
+ void ProcessPackets();
+
+ bool Congested() const;
+
+ private:
+ TimeDelta UpdateTimeAndGetElapsed(Timestamp now);
+ bool ShouldSendKeepalive(Timestamp now) const;
+
+ // Updates the number of bytes that can be sent for the next time interval.
+ void UpdateBudgetWithElapsedTime(TimeDelta delta);
+ void UpdateBudgetWithSentData(DataSize size);
+
+ DataSize PaddingToAdd(absl::optional<DataSize> recommended_probe_size,
+ DataSize data_sent);
+
+ RoundRobinPacketQueue::QueuedPacket* GetPendingPacket(
+ const PacedPacketInfo& pacing_info);
+ void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet);
+ void OnPaddingSent(DataSize padding_sent);
+
+ Timestamp CurrentTime() const;
+
+ Clock* const clock_;
+ PacketSender* const packet_sender_;
+ const std::unique_ptr<FieldTrialBasedConfig> fallback_field_trials_;
+ const WebRtcKeyValueConfig* field_trials_;
+
+ const bool drain_large_queues_;
+ const bool send_padding_if_silent_;
+ const bool pace_audio_;
+ TimeDelta min_packet_limit_;
+
+ // TODO(webrtc:9716): Remove this when we are certain clocks are monotonic.
+ // The last millisecond timestamp returned by |clock_|.
+ mutable Timestamp last_timestamp_;
+ bool paused_;
+ // This is the media budget, keeping track of how many bits of media
+ // we can pace out during the current interval.
+ IntervalBudget media_budget_;
+ // This is the padding budget, keeping track of how many bits of padding we're
+ // allowed to send out during the current interval. This budget will be
+ // utilized when there's no media to send.
+ IntervalBudget padding_budget_;
+
+ BitrateProber prober_;
+ bool probing_send_failure_;
+ bool padding_failure_state_;
+
+ DataRate pacing_bitrate_;
+
+ Timestamp time_last_process_;
+ Timestamp last_send_time_;
+ absl::optional<Timestamp> first_sent_packet_time_;
+
+ RoundRobinPacketQueue packet_queue_;
+ uint64_t packet_counter_;
+
+ DataSize congestion_window_size_;
+ DataSize outstanding_data_;
+
+ TimeDelta queue_time_limit;
+ bool account_for_audio_;
+
+ // If true, PacedSender should only reference packets as in legacy mode.
+ // If false, PacedSender may have direct ownership of RtpPacketToSend objects.
+ // Defaults to true, will be changed to default false soon.
+ const bool legacy_packet_referencing_;
+};
+} // namespace webrtc
+
+#endif // MODULES_PACING_PACING_CONTROLLER_H_
diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc
new file mode 100644
index 0000000..a092e01
--- /dev/null
+++ b/modules/pacing/pacing_controller_unittest.cc
@@ -0,0 +1,1490 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "modules/pacing/pacing_controller.h"
+
+#include <algorithm>
+#include <list>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "absl/memory/memory.h"
+#include "api/units/data_rate.h"
+#include "modules/pacing/packet_router.h"
+#include "system_wrappers/include/clock.h"
+#include "test/field_trial.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+
+using ::testing::_;
+using ::testing::Field;
+using ::testing::Pointee;
+using ::testing::Property;
+using ::testing::Return;
+
+namespace webrtc {
+namespace test {
+namespace {
+constexpr DataRate kFirstClusterRate = DataRate::KilobitsPerSec<900>();
+constexpr DataRate kSecondClusterRate = DataRate::KilobitsPerSec<1800>();
+
+// The error stems from truncating the time interval of probe packets to integer
+// values. This results in probing slightly higher than the target bitrate.
+// For 1.8 Mbps, this comes to be about 120 kbps with 1200 probe packets.
+constexpr DataRate kProbingErrorMargin = DataRate::KilobitsPerSec<150>();
+
+const float kPaceMultiplier = 2.5f;
+
+constexpr uint32_t kAudioSsrc = 12345;
+constexpr uint32_t kVideoSsrc = 234565;
+constexpr uint32_t kVideoRtxSsrc = 34567;
+constexpr uint32_t kFlexFecSsrc = 45678;
+
+constexpr DataRate kTargetRate = DataRate::KilobitsPerSec<800>();
+
+enum class PacerMode { kReferencePackets, kOwnPackets };
+std::string GetFieldTrialStirng(PacerMode mode) {
+ std::string field_trial = "WebRTC-Pacer-LegacyPacketReferencing/";
+ switch (mode) {
+ case PacerMode::kOwnPackets:
+ field_trial += "Disabled";
+ break;
+ case PacerMode::kReferencePackets:
+ field_trial += "Enabled";
+ break;
+ }
+ field_trial += "/";
+ return field_trial;
+}
+
+// TODO(bugs.webrtc.org/10633): Remove when packets are always owned by pacer.
+RtpPacketSender::Priority PacketTypeToPriority(RtpPacketToSend::Type type) {
+ switch (type) {
+ case RtpPacketToSend::Type::kAudio:
+ return RtpPacketSender::Priority::kHighPriority;
+ case RtpPacketToSend::Type::kVideo:
+ return RtpPacketSender::Priority::kLowPriority;
+ case RtpPacketToSend::Type::kRetransmission:
+ return RtpPacketSender::Priority::kNormalPriority;
+ case RtpPacketToSend::Type::kForwardErrorCorrection:
+ return RtpPacketSender::Priority::kLowPriority;
+ break;
+ case RtpPacketToSend::Type::kPadding:
+ RTC_NOTREACHED() << "Unexpected type for legacy path: kPadding";
+ break;
+ }
+ return RtpPacketSender::Priority::kLowPriority;
+}
+
+std::unique_ptr<RtpPacketToSend> BuildPacket(RtpPacketToSend::Type type,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ size_t size) {
+ auto packet = absl::make_unique<RtpPacketToSend>(nullptr);
+ packet->set_packet_type(type);
+ packet->SetSsrc(ssrc);
+ packet->SetSequenceNumber(sequence_number);
+ packet->set_capture_time_ms(capture_time_ms);
+ packet->SetPayloadSize(size);
+ return packet;
+}
+} // namespace
+
+// Mock callback proxy, where both new and old api redirects to common mock
+// methods that focus on core aspects.
+class MockPacingControllerCallback : public PacingController::PacketSender {
+ public:
+ RtpPacketSendResult TimeToSendPacket(uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_timestamp,
+ bool retransmission,
+ const PacedPacketInfo& packet_info) {
+ SendPacket(ssrc, sequence_number, capture_timestamp, retransmission, false);
+ return RtpPacketSendResult::kSuccess;
+ }
+
+ void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) override {
+ SendPacket(packet->Ssrc(), packet->SequenceNumber(),
+ packet->capture_time_ms(),
+ packet->packet_type() == RtpPacketToSend::Type::kRetransmission,
+ packet->packet_type() == RtpPacketToSend::Type::kPadding);
+ }
+
+ DataSize TimeToSendPadding(DataSize size,
+ const PacedPacketInfo& packet_info) override {
+ return DataSize::bytes(SendPadding(size.bytes()));
+ }
+
+ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
+ DataSize target_size) override {
+ std::vector<std::unique_ptr<RtpPacketToSend>> ret;
+ size_t padding_size = SendPadding(target_size.bytes());
+ if (padding_size > 0) {
+ auto packet = absl::make_unique<RtpPacketToSend>(nullptr);
+ packet->SetPayloadSize(padding_size);
+ packet->set_packet_type(RtpPacketToSend::Type::kPadding);
+ ret.emplace_back(std::move(packet));
+ }
+ return ret;
+ }
+
+ MOCK_METHOD5(SendPacket,
+ void(uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_timestamp,
+ bool retransmission,
+ bool padding));
+ MOCK_METHOD1(SendPadding, size_t(size_t target_size));
+};
+
+// Mock callback implementing the raw api.
+class MockPacketSender : public PacingController::PacketSender {
+ public:
+ MOCK_METHOD5(TimeToSendPacket,
+ RtpPacketSendResult(uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ bool retransmission,
+ const PacedPacketInfo& pacing_info));
+ MOCK_METHOD2(TimeToSendPadding,
+ DataSize(DataSize size, const PacedPacketInfo& pacing_info));
+
+ MOCK_METHOD2(SendRtpPacket,
+ void(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info));
+ MOCK_METHOD1(
+ GeneratePadding,
+ std::vector<std::unique_ptr<RtpPacketToSend>>(DataSize target_size));
+};
+
+class PacingControllerPadding : public PacingController::PacketSender {
+ public:
+ static const size_t kPaddingPacketSize = 224;
+
+ PacingControllerPadding() : padding_sent_(0) {}
+
+ RtpPacketSendResult TimeToSendPacket(
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ bool retransmission,
+ const PacedPacketInfo& pacing_info) override {
+ return RtpPacketSendResult::kSuccess;
+ }
+
+ void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& pacing_info) override {}
+
+ DataSize TimeToSendPadding(DataSize size,
+ const PacedPacketInfo& pacing_info) override {
+ size_t num_packets =
+ (size.bytes() + kPaddingPacketSize - 1) / kPaddingPacketSize;
+ padding_sent_ += kPaddingPacketSize * num_packets;
+ return DataSize::bytes(kPaddingPacketSize * num_packets);
+ }
+
+ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
+ DataSize target_size) override {
+ size_t num_packets =
+ (target_size.bytes() + kPaddingPacketSize - 1) / kPaddingPacketSize;
+ std::vector<std::unique_ptr<RtpPacketToSend>> packets;
+ for (size_t i = 0; i < num_packets; ++i) {
+ packets.emplace_back(absl::make_unique<RtpPacketToSend>(nullptr));
+ packets.back()->SetPadding(kPaddingPacketSize);
+ packets.back()->set_packet_type(RtpPacketToSend::Type::kPadding);
+ padding_sent_ += kPaddingPacketSize;
+ }
+ return packets;
+ }
+
+ size_t padding_sent() { return padding_sent_; }
+
+ private:
+ size_t padding_sent_;
+};
+
+class PacingControllerProbing : public PacingController::PacketSender {
+ public:
+ PacingControllerProbing() : packets_sent_(0), padding_sent_(0) {}
+
+ RtpPacketSendResult TimeToSendPacket(
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ bool retransmission,
+ const PacedPacketInfo& pacing_info) override {
+ ++packets_sent_;
+ return RtpPacketSendResult::kSuccess;
+ }
+
+ void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& pacing_info) override {
+ if (packet->packet_type() != RtpPacketToSend::Type::kPadding) {
+ ++packets_sent_;
+ }
+ }
+
+ DataSize TimeToSendPadding(DataSize size,
+ const PacedPacketInfo& pacing_info) override {
+ padding_sent_ += size.bytes();
+ return DataSize::bytes(padding_sent_);
+ }
+
+ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
+ DataSize target_size) override {
+ std::vector<std::unique_ptr<RtpPacketToSend>> packets;
+ packets.emplace_back(absl::make_unique<RtpPacketToSend>(nullptr));
+ packets.back()->SetPadding(target_size.bytes());
+ packets.back()->set_packet_type(RtpPacketToSend::Type::kPadding);
+ padding_sent_ += target_size.bytes();
+ return packets;
+ }
+
+ int packets_sent() const { return packets_sent_; }
+
+ int padding_sent() const { return padding_sent_; }
+
+ private:
+ int packets_sent_;
+ int padding_sent_;
+};
+
+class PacingControllerTest : public ::testing::TestWithParam<PacerMode> {
+ protected:
+ PacingControllerTest()
+ : clock_(123456), field_trial_(GetFieldTrialStirng(GetParam())) {
+ srand(0);
+ // Need to initialize PacingController after we initialize clock.
+ pacer_ = absl::make_unique<PacingController>(&clock_, &callback_, nullptr,
+ nullptr);
+ Init();
+ }
+
+ void Init() {
+ pacer_->CreateProbeCluster(kFirstClusterRate, /*cluster_id=*/0);
+ pacer_->CreateProbeCluster(kSecondClusterRate, /*cluster_id=*/1);
+ // Default to bitrate probing disabled for testing purposes. Probing tests
+ // have to enable probing, either by creating a new PacingController
+ // instance or by calling SetProbingEnabled(true).
+ pacer_->SetProbingEnabled(false);
+ pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, DataRate::Zero());
+
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ }
+
+ void Send(RtpPacketToSend::Type type,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ size_t size) {
+ if (GetParam() == PacerMode::kReferencePackets) {
+ pacer_->InsertPacket(PacketTypeToPriority(type), ssrc, sequence_number,
+ capture_time_ms, size,
+ type == RtpPacketToSend::Type::kRetransmission);
+ } else {
+ pacer_->EnqueuePacket(
+ BuildPacket(type, ssrc, sequence_number, capture_time_ms, size));
+ }
+ }
+
+ void SendAndExpectPacket(RtpPacketToSend::Type type,
+ uint32_t ssrc,
+ uint16_t sequence_number,
+ int64_t capture_time_ms,
+ size_t size) {
+ Send(type, ssrc, sequence_number, capture_time_ms, size);
+ EXPECT_CALL(
+ callback_,
+ SendPacket(ssrc, sequence_number, capture_time_ms,
+ type == RtpPacketToSend::Type::kRetransmission, false))
+ .Times(1);
+ }
+
+ void ExpectSendPadding() {
+ if (GetParam() == PacerMode::kOwnPackets) {
+ EXPECT_CALL(callback_, SendPacket(_, _, _, _, true)).Times(1);
+ }
+ }
+
+ std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
+ auto packet = absl::make_unique<RtpPacketToSend>(nullptr);
+ packet->set_packet_type(type);
+ switch (type) {
+ case RtpPacketToSend::Type::kAudio:
+ packet->SetSsrc(kAudioSsrc);
+ break;
+ case RtpPacketToSend::Type::kVideo:
+ packet->SetSsrc(kVideoSsrc);
+ break;
+ case RtpPacketToSend::Type::kRetransmission:
+ case RtpPacketToSend::Type::kPadding:
+ packet->SetSsrc(kVideoRtxSsrc);
+ break;
+ case RtpPacketToSend::Type::kForwardErrorCorrection:
+ packet->SetSsrc(kFlexFecSsrc);
+ break;
+ }
+
+ packet->SetPayloadSize(234);
+ return packet;
+ }
+
+ TimeDelta TimeUntilNextProcess() {
+ // TODO(bugs.webrtc.org/10809): Replace this with TimeUntilAvailableBudget()
+ // once ported from WIP code. For now, emulate PacedSender method.
+
+ TimeDelta elapsed_time = pacer_->TimeElapsedSinceLastProcess();
+ if (pacer_->IsPaused()) {
+ return std::max(PacingController::kPausedProcessInterval - elapsed_time,
+ TimeDelta::Zero());
+ }
+
+ auto next_probe = pacer_->TimeUntilNextProbe();
+ if (next_probe) {
+ return *next_probe;
+ }
+
+ const TimeDelta min_packet_limit = TimeDelta::ms(5);
+ return std::max(min_packet_limit - elapsed_time, TimeDelta::Zero());
+ }
+
+ SimulatedClock clock_;
+ ScopedFieldTrials field_trial_;
+ MockPacingControllerCallback callback_;
+ std::unique_ptr<PacingController> pacer_;
+};
+
+class PacingControllerFieldTrialTest
+ : public ::testing::TestWithParam<PacerMode> {
+ protected:
+ struct MediaStream {
+ const RtpPacketToSend::Type type;
+ const uint32_t ssrc;
+ const size_t packet_size;
+ uint16_t seq_num;
+ };
+
+ const int kProcessIntervalsPerSecond = 1000 / 5;
+
+ PacingControllerFieldTrialTest() : clock_(123456) {}
+ void InsertPacket(PacingController* pacer, MediaStream* stream) {
+ if (GetParam() == PacerMode::kReferencePackets) {
+ pacer->InsertPacket(PacketTypeToPriority(stream->type), stream->ssrc,
+ stream->seq_num++, clock_.TimeInMilliseconds(),
+ stream->packet_size, false);
+ } else {
+ pacer->EnqueuePacket(
+ BuildPacket(stream->type, stream->ssrc, stream->seq_num++,
+ clock_.TimeInMilliseconds(), stream->packet_size));
+ }
+ }
+ void ProcessNext(PacingController* pacer) {
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer->ProcessPackets();
+ }
+ MediaStream audio{/*type*/ RtpPacketToSend::Type::kAudio,
+ /*ssrc*/ 3333, /*packet_size*/ 100, /*seq_num*/ 1000};
+ MediaStream video{/*type*/ RtpPacketToSend::Type::kVideo,
+ /*ssrc*/ 4444, /*packet_size*/ 1000, /*seq_num*/ 1000};
+ SimulatedClock clock_;
+ MockPacingControllerCallback callback_;
+};
+
+TEST_P(PacingControllerFieldTrialTest, DefaultNoPaddingInSilence) {
+ PacingController pacer(&clock_, &callback_, nullptr, nullptr);
+ pacer.SetPacingRates(kTargetRate, DataRate::Zero());
+ // Video packet to reset last send time and provide padding data.
+ InsertPacket(&pacer, &video);
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer.ProcessPackets();
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ // Waiting 500 ms should not trigger sending of padding.
+ clock_.AdvanceTimeMilliseconds(500);
+ pacer.ProcessPackets();
+}
+
+TEST_P(PacingControllerFieldTrialTest, PaddingInSilenceWithTrial) {
+ ScopedFieldTrials trial(GetFieldTrialStirng(GetParam()) +
+ "WebRTC-Pacer-PadInSilence/Enabled/");
+ PacingController pacer(&clock_, &callback_, nullptr, nullptr);
+ pacer.SetPacingRates(kTargetRate, DataRate::Zero());
+ // Video packet to reset last send time and provide padding data.
+ InsertPacket(&pacer, &video);
+ if (GetParam() == PacerMode::kReferencePackets) {
+ // Only payload, not padding, sent by pacer in legacy mode.
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ } else {
+ EXPECT_CALL(callback_, SendPacket).Times(2);
+ }
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer.ProcessPackets();
+ EXPECT_CALL(callback_, SendPadding).WillOnce(Return(1000));
+ // Waiting 500 ms should trigger sending of padding.
+ clock_.AdvanceTimeMilliseconds(500);
+ pacer.ProcessPackets();
+}
+
+TEST_P(PacingControllerFieldTrialTest, DefaultCongestionWindowAffectsAudio) {
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ PacingController pacer(&clock_, &callback_, nullptr, nullptr);
+ pacer.SetPacingRates(DataRate::bps(10000000), DataRate::Zero());
+ pacer.SetCongestionWindow(DataSize::bytes(800));
+ pacer.UpdateOutstandingData(DataSize::Zero());
+ // Video packet fills congestion window.
+ InsertPacket(&pacer, &video);
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ ProcessNext(&pacer);
+ // Audio packet blocked due to congestion.
+ InsertPacket(&pacer, &audio);
+ EXPECT_CALL(callback_, SendPacket).Times(0);
+ ProcessNext(&pacer);
+ ProcessNext(&pacer);
+ // Audio packet unblocked when congestion window clear.
+ ::testing::Mock::VerifyAndClearExpectations(&callback_);
+ pacer.UpdateOutstandingData(DataSize::Zero());
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ ProcessNext(&pacer);
+}
+
+TEST_P(PacingControllerFieldTrialTest,
+ CongestionWindowDoesNotAffectAudioInTrial) {
+ ScopedFieldTrials trial(GetFieldTrialStirng(GetParam()) +
+ "WebRTC-Pacer-BlockAudio/Disabled/");
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ PacingController pacer(&clock_, &callback_, nullptr, nullptr);
+ pacer.SetPacingRates(DataRate::bps(10000000), DataRate::Zero());
+ pacer.SetCongestionWindow(DataSize::bytes(800));
+ pacer.UpdateOutstandingData(DataSize::Zero());
+ // Video packet fills congestion window.
+ InsertPacket(&pacer, &video);
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ ProcessNext(&pacer);
+ // Audio not blocked due to congestion.
+ InsertPacket(&pacer, &audio);
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ ProcessNext(&pacer);
+}
+
+TEST_P(PacingControllerFieldTrialTest, DefaultBudgetAffectsAudio) {
+ PacingController pacer(&clock_, &callback_, nullptr, nullptr);
+ pacer.SetPacingRates(
+ DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond),
+ DataRate::Zero());
+ // Video fills budget for following process periods.
+ InsertPacket(&pacer, &video);
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ ProcessNext(&pacer);
+ // Audio packet blocked due to budget limit.
+ EXPECT_CALL(callback_, SendPacket).Times(0);
+ InsertPacket(&pacer, &audio);
+ ProcessNext(&pacer);
+ ProcessNext(&pacer);
+ ::testing::Mock::VerifyAndClearExpectations(&callback_);
+ // Audio packet unblocked when the budget has recovered.
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ ProcessNext(&pacer);
+ ProcessNext(&pacer);
+}
+
+TEST_P(PacingControllerFieldTrialTest, BudgetDoesNotAffectAudioInTrial) {
+ ScopedFieldTrials trial(GetFieldTrialStirng(GetParam()) +
+ "WebRTC-Pacer-BlockAudio/Disabled/");
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ PacingController pacer(&clock_, &callback_, nullptr, nullptr);
+ pacer.SetPacingRates(
+ DataRate::bps(video.packet_size / 3 * 8 * kProcessIntervalsPerSecond),
+ DataRate::Zero());
+ // Video fills budget for following process periods.
+ InsertPacket(&pacer, &video);
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ ProcessNext(&pacer);
+ // Audio packet not blocked due to budget limit.
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ InsertPacket(&pacer, &audio);
+ ProcessNext(&pacer);
+}
+
+INSTANTIATE_TEST_SUITE_P(ReferencingAndOwningPackets,
+ PacingControllerFieldTrialTest,
+ ::testing::Values(PacerMode::kReferencePackets,
+ PacerMode::kOwnPackets));
+
+TEST_P(PacingControllerTest, FirstSentPacketTimeIsSet) {
+ uint16_t sequence_number = 1234;
+ const uint32_t kSsrc = 12345;
+ const size_t kSizeBytes = 250;
+ const size_t kPacketToSend = 3;
+ const Timestamp kStartTime = clock_.CurrentTime();
+
+ // No packet sent.
+ EXPECT_FALSE(pacer_->FirstSentPacketTime().has_value());
+
+ for (size_t i = 0; i < kPacketToSend; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, kSsrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kSizeBytes);
+ pacer_->ProcessPackets();
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ }
+ EXPECT_EQ(kStartTime, pacer_->FirstSentPacketTime());
+}
+
+TEST_P(PacingControllerTest, QueuePacket) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+ // Due to the multiplicative factor we can send 5 packets during a send
+ // interval. (network capacity * multiplier / (8 bits per byte *
+ // (packet size * #send intervals per second)
+ const size_t packets_to_send =
+ kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200);
+ for (size_t i = 0; i < packets_to_send; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), 250);
+ }
+
+ int64_t queued_packet_timestamp = clock_.TimeInMilliseconds();
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number,
+ queued_packet_timestamp, 250);
+ EXPECT_EQ(packets_to_send + 1, pacer_->QueueSizePackets());
+ pacer_->ProcessPackets();
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ clock_.AdvanceTimeMilliseconds(5);
+ EXPECT_EQ(1u, pacer_->QueueSizePackets());
+ EXPECT_CALL(callback_, SendPacket(ssrc, sequence_number++,
+ queued_packet_timestamp, false, false))
+ .Times(1);
+ pacer_->ProcessPackets();
+ sequence_number++;
+ EXPECT_EQ(0u, pacer_->QueueSizePackets());
+
+ // We can send packets_to_send -1 packets of size 250 during the current
+ // interval since one packet has already been sent.
+ for (size_t i = 0; i < packets_to_send - 1; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), 250);
+ }
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), 250);
+ EXPECT_EQ(packets_to_send, pacer_->QueueSizePackets());
+ pacer_->ProcessPackets();
+ EXPECT_EQ(1u, pacer_->QueueSizePackets());
+}
+
+TEST_P(PacingControllerTest, PaceQueuedPackets) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+
+ // Due to the multiplicative factor we can send 5 packets during a send
+ // interval. (network capacity * multiplier / (8 bits per byte *
+ // (packet size * #send intervals per second)
+ const size_t packets_to_send_per_interval =
+ kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200);
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), 250);
+ }
+
+ for (size_t j = 0; j < packets_to_send_per_interval * 10; ++j) {
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), 250);
+ }
+ EXPECT_EQ(packets_to_send_per_interval + packets_to_send_per_interval * 10,
+ pacer_->QueueSizePackets());
+ pacer_->ProcessPackets();
+ EXPECT_EQ(packets_to_send_per_interval * 10, pacer_->QueueSizePackets());
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ for (int k = 0; k < 10; ++k) {
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, false))
+ .Times(packets_to_send_per_interval);
+ pacer_->ProcessPackets();
+ }
+ EXPECT_EQ(0u, pacer_->QueueSizePackets());
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ EXPECT_EQ(0u, pacer_->QueueSizePackets());
+ pacer_->ProcessPackets();
+
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), 250);
+ }
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number,
+ clock_.TimeInMilliseconds(), 250);
+ pacer_->ProcessPackets();
+ EXPECT_EQ(1u, pacer_->QueueSizePackets());
+}
+
+TEST_P(PacingControllerTest, RepeatedRetransmissionsAllowed) {
+ // Send one packet, then two retransmissions of that packet.
+ for (size_t i = 0; i < 3; i++) {
+ constexpr uint32_t ssrc = 333;
+ constexpr uint16_t sequence_number = 444;
+ constexpr size_t bytes = 250;
+ bool is_retransmission = (i != 0); // Original followed by retransmissions.
+ SendAndExpectPacket(
+ is_retransmission ? RtpPacketToSend::Type::kRetransmission
+ : RtpPacketToSend::Type::kVideo,
+ ssrc, sequence_number, clock_.TimeInMilliseconds(), bytes);
+ clock_.AdvanceTimeMilliseconds(5);
+ }
+ pacer_->ProcessPackets();
+}
+
+TEST_P(PacingControllerTest,
+ CanQueuePacketsWithSameSequenceNumberOnDifferentSsrcs) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number,
+ clock_.TimeInMilliseconds(), 250);
+
+ // Expect packet on second ssrc to be queued and sent as well.
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc + 1, sequence_number,
+ clock_.TimeInMilliseconds(), 250);
+
+ clock_.AdvanceTimeMilliseconds(1000);
+ pacer_->ProcessPackets();
+}
+
+TEST_P(PacingControllerTest, Padding) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+
+ pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate);
+
+ // Due to the multiplicative factor we can send 5 packets during a send
+ // interval. (network capacity * multiplier / (8 bits per byte *
+ // (packet size * #send intervals per second)
+ const size_t packets_to_send_per_interval =
+ kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200);
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), 250);
+ }
+ // No padding is expected since we have sent too much already.
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ pacer_->ProcessPackets();
+ EXPECT_EQ(0u, pacer_->QueueSizePackets());
+
+ // 5 milliseconds later should not send padding since we filled the buffers
+ // initially.
+ EXPECT_CALL(callback_, SendPadding(250)).Times(0);
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+
+ // 5 milliseconds later we have enough budget to send some padding.
+ EXPECT_CALL(callback_, SendPadding(250)).WillOnce(Return(250));
+ ExpectSendPadding();
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+}
+
+TEST_P(PacingControllerTest, NoPaddingBeforeNormalPacket) {
+ pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate);
+
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ pacer_->ProcessPackets();
+ clock_.AdvanceTime(TimeUntilNextProcess());
+
+ pacer_->ProcessPackets();
+ clock_.AdvanceTime(TimeUntilNextProcess());
+
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = 56789;
+
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ capture_time_ms, 250);
+ EXPECT_CALL(callback_, SendPadding(250)).WillOnce(Return(250));
+ ExpectSendPadding();
+ pacer_->ProcessPackets();
+}
+
+TEST_P(PacingControllerTest, VerifyPaddingUpToBitrate) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = 56789;
+ const int kTimeStep = 5;
+ const int64_t kBitrateWindow = 100;
+ pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate);
+
+ int64_t start_time = clock_.TimeInMilliseconds();
+ while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ capture_time_ms, 250);
+ EXPECT_CALL(callback_, SendPadding(250)).WillOnce(Return(250));
+ ExpectSendPadding();
+ pacer_->ProcessPackets();
+ clock_.AdvanceTimeMilliseconds(kTimeStep);
+ }
+}
+
+TEST_P(PacingControllerTest, VerifyAverageBitrateVaryingMediaPayload) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = 56789;
+ const int kTimeStep = 5;
+ const int64_t kBitrateWindow = 10000;
+ PacingControllerPadding callback;
+ pacer_ =
+ absl::make_unique<PacingController>(&clock_, &callback, nullptr, nullptr);
+ pacer_->SetProbingEnabled(false);
+ pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate);
+
+ int64_t start_time = clock_.TimeInMilliseconds();
+ size_t media_bytes = 0;
+ while (clock_.TimeInMilliseconds() - start_time < kBitrateWindow) {
+ int rand_value = rand(); // NOLINT (rand_r instead of rand)
+ size_t media_payload = rand_value % 100 + 200; // [200, 300] bytes.
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ capture_time_ms, media_payload);
+ media_bytes += media_payload;
+ clock_.AdvanceTimeMilliseconds(kTimeStep);
+ pacer_->ProcessPackets();
+ }
+ EXPECT_NEAR(kTargetRate.kbps(),
+ static_cast<int>(8 * (media_bytes + callback.padding_sent()) /
+ kBitrateWindow),
+ 1);
+}
+
+TEST_P(PacingControllerTest, Priority) {
+ uint32_t ssrc_low_priority = 12345;
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = 56789;
+ int64_t capture_time_ms_low_priority = 1234567;
+
+ // Due to the multiplicative factor we can send 5 packets during a send
+ // interval. (network capacity * multiplier / (8 bits per byte *
+ // (packet size * #send intervals per second)
+ const size_t packets_to_send_per_interval =
+ kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200);
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kRetransmission, ssrc,
+ sequence_number++, clock_.TimeInMilliseconds(), 250);
+ }
+ pacer_->ProcessPackets();
+ EXPECT_EQ(0u, pacer_->QueueSizePackets());
+
+ // Expect normal and low priority to be queued and high to pass through.
+ Send(RtpPacketToSend::Type::kVideo, ssrc_low_priority, sequence_number++,
+ capture_time_ms_low_priority, 250);
+
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ Send(RtpPacketToSend::Type::kRetransmission, ssrc, sequence_number++,
+ capture_time_ms, 250);
+ }
+ Send(RtpPacketToSend::Type::kAudio, ssrc, sequence_number++, capture_time_ms,
+ 250);
+
+ // Expect all high and normal priority to be sent out first.
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ EXPECT_CALL(callback_, SendPacket(ssrc, _, capture_time_ms, _, _))
+ .Times(packets_to_send_per_interval + 1);
+
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+ EXPECT_EQ(1u, pacer_->QueueSizePackets());
+
+ EXPECT_CALL(callback_, SendPacket(ssrc_low_priority, _,
+ capture_time_ms_low_priority, _, _))
+ .Times(1);
+
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+}
+
+TEST_P(PacingControllerTest, RetransmissionPriority) {
+ uint32_t ssrc = 12345;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = 45678;
+ int64_t capture_time_ms_retransmission = 56789;
+
+ // Due to the multiplicative factor we can send 5 packets during a send
+ // interval. (network capacity * multiplier / (8 bits per byte *
+ // (packet size * #send intervals per second)
+ const size_t packets_to_send_per_interval =
+ kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200);
+ pacer_->ProcessPackets();
+ EXPECT_EQ(0u, pacer_->QueueSizePackets());
+
+ // Alternate retransmissions and normal packets.
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ capture_time_ms, 250);
+ Send(RtpPacketToSend::Type::kRetransmission, ssrc, sequence_number++,
+ capture_time_ms_retransmission, 250);
+ }
+ EXPECT_EQ(2 * packets_to_send_per_interval, pacer_->QueueSizePackets());
+
+ // Expect all retransmissions to be sent out first despite having a later
+ // capture time.
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ EXPECT_CALL(callback_, SendPacket(_, _, _, false, _)).Times(0);
+ EXPECT_CALL(callback_,
+ SendPacket(ssrc, _, capture_time_ms_retransmission, true, _))
+ .Times(packets_to_send_per_interval);
+
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+ EXPECT_EQ(packets_to_send_per_interval, pacer_->QueueSizePackets());
+
+ // Expect the remaining (non-retransmission) packets to be sent.
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ EXPECT_CALL(callback_, SendPacket(_, _, _, true, _)).Times(0);
+ EXPECT_CALL(callback_, SendPacket(ssrc, _, capture_time_ms, false, _))
+ .Times(packets_to_send_per_interval);
+
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+
+ EXPECT_EQ(0u, pacer_->QueueSizePackets());
+}
+
+TEST_P(PacingControllerTest, HighPrioDoesntAffectBudget) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = 56789;
+
+ // As high prio packets doesn't affect the budget, we should be able to send
+ // a high number of them at once.
+ for (int i = 0; i < 25; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kAudio, ssrc, sequence_number++,
+ capture_time_ms, 250);
+ }
+ pacer_->ProcessPackets();
+ // Low prio packets does affect the budget.
+ // Due to the multiplicative factor we can send 5 packets during a send
+ // interval. (network capacity * multiplier / (8 bits per byte *
+ // (packet size * #send intervals per second)
+ const size_t packets_to_send_per_interval =
+ kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200);
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), 250);
+ }
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number, capture_time_ms,
+ 250);
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+ EXPECT_EQ(1u, pacer_->QueueSizePackets());
+ EXPECT_CALL(callback_,
+ SendPacket(ssrc, sequence_number++, capture_time_ms, false, _))
+ .Times(1);
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+ EXPECT_EQ(0u, pacer_->QueueSizePackets());
+}
+
+TEST_P(PacingControllerTest, SendsOnlyPaddingWhenCongested) {
+ uint32_t ssrc = 202020;
+ uint16_t sequence_number = 1000;
+ int kPacketSize = 250;
+ int kCongestionWindow = kPacketSize * 10;
+
+ pacer_->UpdateOutstandingData(DataSize::Zero());
+ pacer_->SetCongestionWindow(DataSize::bytes(kCongestionWindow));
+ int sent_data = 0;
+ while (sent_data < kCongestionWindow) {
+ sent_data += kPacketSize;
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ }
+ ::testing::Mock::VerifyAndClearExpectations(&callback_);
+ EXPECT_CALL(callback_, SendPacket).Times(0);
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+
+ size_t blocked_packets = 0;
+ int64_t expected_time_until_padding = 500;
+ while (expected_time_until_padding > 5) {
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ blocked_packets++;
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ expected_time_until_padding -= 5;
+ }
+ ::testing::Mock::VerifyAndClearExpectations(&callback_);
+ EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1));
+ ExpectSendPadding();
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ EXPECT_EQ(blocked_packets, pacer_->QueueSizePackets());
+}
+
+TEST_P(PacingControllerTest, DoesNotAllowOveruseAfterCongestion) {
+ uint32_t ssrc = 202020;
+ uint16_t seq_num = 1000;
+ int size = 1000;
+ auto now_ms = [this] { return clock_.TimeInMilliseconds(); };
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ // The pacing rate is low enough that the budget should not allow two packets
+ // to be sent in a row.
+ pacer_->SetPacingRates(DataRate::bps(400 * 8 * 1000 / 5), DataRate::Zero());
+ // The congestion window is small enough to only let one packet through.
+ pacer_->SetCongestionWindow(DataSize::bytes(800));
+ pacer_->UpdateOutstandingData(DataSize::Zero());
+ // Not yet budget limited or congested, packet is sent.
+ Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size);
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ // Packet blocked due to congestion.
+ Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size);
+ EXPECT_CALL(callback_, SendPacket).Times(0);
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ // Packet blocked due to congestion.
+ Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size);
+ EXPECT_CALL(callback_, SendPacket).Times(0);
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ pacer_->UpdateOutstandingData(DataSize::Zero());
+ // Congestion removed and budget has recovered, packet is sent.
+ Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size);
+ EXPECT_CALL(callback_, SendPacket).Times(1);
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ pacer_->UpdateOutstandingData(DataSize::Zero());
+ // Should be blocked due to budget limitation as congestion has be removed.
+ Send(RtpPacketToSend::Type::kVideo, ssrc, seq_num++, now_ms(), size);
+ EXPECT_CALL(callback_, SendPacket).Times(0);
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+}
+
+TEST_P(PacingControllerTest, ResumesSendingWhenCongestionEnds) {
+ uint32_t ssrc = 202020;
+ uint16_t sequence_number = 1000;
+ int64_t kPacketSize = 250;
+ int64_t kCongestionCount = 10;
+ int64_t kCongestionWindow = kPacketSize * kCongestionCount;
+ int64_t kCongestionTimeMs = 1000;
+
+ pacer_->UpdateOutstandingData(DataSize::Zero());
+ pacer_->SetCongestionWindow(DataSize::bytes(kCongestionWindow));
+ int sent_data = 0;
+ while (sent_data < kCongestionWindow) {
+ sent_data += kPacketSize;
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ }
+ ::testing::Mock::VerifyAndClearExpectations(&callback_);
+ EXPECT_CALL(callback_, SendPacket).Times(0);
+ int unacked_packets = 0;
+ for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ unacked_packets++;
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ }
+ ::testing::Mock::VerifyAndClearExpectations(&callback_);
+
+ // First mark half of the congested packets as cleared and make sure that just
+ // as many are sent
+ int ack_count = kCongestionCount / 2;
+ EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _)).Times(ack_count);
+ pacer_->UpdateOutstandingData(
+ DataSize::bytes(kCongestionWindow - kPacketSize * ack_count));
+
+ for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ }
+ unacked_packets -= ack_count;
+ ::testing::Mock::VerifyAndClearExpectations(&callback_);
+
+ // Second make sure all packets are sent if sent packets are continuously
+ // marked as acked.
+ EXPECT_CALL(callback_, SendPacket(ssrc, _, _, false, _))
+ .Times(unacked_packets);
+ for (int duration = 0; duration < kCongestionTimeMs; duration += 5) {
+ pacer_->UpdateOutstandingData(DataSize::Zero());
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ }
+}
+
+TEST_P(PacingControllerTest, Pause) {
+ uint32_t ssrc_low_priority = 12345;
+ uint32_t ssrc = 12346;
+ uint32_t ssrc_high_priority = 12347;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = clock_.TimeInMilliseconds();
+
+ EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
+
+ // Due to the multiplicative factor we can send 5 packets during a send
+ // interval. (network capacity * multiplier / (8 bits per byte *
+ // (packet size * #send intervals per second)
+ const size_t packets_to_send_per_interval =
+ kTargetRate.bps() * kPaceMultiplier / (8 * 250 * 200);
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), 250);
+ }
+
+ pacer_->ProcessPackets();
+
+ pacer_->Pause();
+
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ Send(RtpPacketToSend::Type::kVideo, ssrc_low_priority, sequence_number++,
+ capture_time_ms, 250);
+ Send(RtpPacketToSend::Type::kRetransmission, ssrc, sequence_number++,
+ capture_time_ms, 250);
+ Send(RtpPacketToSend::Type::kAudio, ssrc_high_priority, sequence_number++,
+ capture_time_ms, 250);
+ }
+ clock_.AdvanceTimeMilliseconds(10000);
+ int64_t second_capture_time_ms = clock_.TimeInMilliseconds();
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ Send(RtpPacketToSend::Type::kVideo, ssrc_low_priority, sequence_number++,
+ second_capture_time_ms, 250);
+ Send(RtpPacketToSend::Type::kRetransmission, ssrc, sequence_number++,
+ second_capture_time_ms, 250);
+ Send(RtpPacketToSend::Type::kAudio, ssrc_high_priority, sequence_number++,
+ second_capture_time_ms, 250);
+ }
+
+ // Expect everything to be queued.
+ EXPECT_EQ(TimeDelta::ms(second_capture_time_ms - capture_time_ms),
+ pacer_->OldestPacketWaitTime());
+
+ EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1));
+ ExpectSendPadding();
+ pacer_->ProcessPackets();
+
+ int64_t expected_time_until_send = 500;
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ while (expected_time_until_send >= 5) {
+ pacer_->ProcessPackets();
+ clock_.AdvanceTimeMilliseconds(5);
+ expected_time_until_send -= 5;
+ }
+
+ ::testing::Mock::VerifyAndClearExpectations(&callback_);
+ EXPECT_CALL(callback_, SendPadding(1)).WillOnce(Return(1));
+ ExpectSendPadding();
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->ProcessPackets();
+ ::testing::Mock::VerifyAndClearExpectations(&callback_);
+
+ // Expect high prio packets to come out first followed by normal
+ // prio packets and low prio packets (all in capture order).
+ {
+ ::testing::InSequence sequence;
+ EXPECT_CALL(callback_,
+ SendPacket(ssrc_high_priority, _, capture_time_ms, _, _))
+ .Times(packets_to_send_per_interval);
+ EXPECT_CALL(callback_,
+ SendPacket(ssrc_high_priority, _, second_capture_time_ms, _, _))
+ .Times(packets_to_send_per_interval);
+
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ EXPECT_CALL(callback_, SendPacket(ssrc, _, capture_time_ms, _, _))
+ .Times(1);
+ }
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ EXPECT_CALL(callback_, SendPacket(ssrc, _, second_capture_time_ms, _, _))
+ .Times(1);
+ }
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ EXPECT_CALL(callback_,
+ SendPacket(ssrc_low_priority, _, capture_time_ms, _, _))
+ .Times(1);
+ }
+ for (size_t i = 0; i < packets_to_send_per_interval; ++i) {
+ EXPECT_CALL(callback_, SendPacket(ssrc_low_priority, _,
+ second_capture_time_ms, _, _))
+ .Times(1);
+ }
+ }
+ pacer_->Resume();
+
+ // The pacer was resumed directly after the previous process call finished. It
+ // will therefore wait 5 ms until next process.
+ clock_.AdvanceTime(TimeUntilNextProcess());
+
+ for (size_t i = 0; i < 4; i++) {
+ pacer_->ProcessPackets();
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ }
+
+ EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
+}
+
+TEST_P(PacingControllerTest, ResendPacket) {
+ if (GetParam() == PacerMode::kOwnPackets) {
+ // This test only makes sense when re-sending is supported.
+ return;
+ }
+
+ MockPacketSender callback;
+
+ // Need to initialize PacedSender after we initialize clock.
+ pacer_ =
+ absl::make_unique<PacingController>(&clock_, &callback, nullptr, nullptr);
+ Init();
+
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ int64_t capture_time_ms = clock_.TimeInMilliseconds();
+ EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
+
+ pacer_->InsertPacket(RtpPacketSender::kNormalPriority, ssrc, sequence_number,
+ capture_time_ms, 250, false);
+ clock_.AdvanceTimeMilliseconds(1);
+ pacer_->InsertPacket(RtpPacketSender::kNormalPriority, ssrc,
+ sequence_number + 1, capture_time_ms + 1, 250, false);
+ clock_.AdvanceTimeMilliseconds(9999);
+ EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms),
+ pacer_->OldestPacketWaitTime());
+ // Fails to send first packet so only one call.
+ EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number, capture_time_ms,
+ false, _))
+ .Times(1)
+ .WillOnce(Return(RtpPacketSendResult::kTransportUnavailable));
+ clock_.AdvanceTimeMilliseconds(10000);
+ pacer_->ProcessPackets();
+
+ // Queue remains unchanged.
+ EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms),
+ pacer_->OldestPacketWaitTime());
+
+ // Fails to send second packet.
+ EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number, capture_time_ms,
+ false, _))
+ .WillOnce(Return(RtpPacketSendResult::kSuccess));
+ EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number + 1,
+ capture_time_ms + 1, false, _))
+ .WillOnce(Return(RtpPacketSendResult::kTransportUnavailable));
+ clock_.AdvanceTimeMilliseconds(10000);
+ pacer_->ProcessPackets();
+
+ // Queue is reduced by 1 packet.
+ EXPECT_EQ(TimeDelta::ms(clock_.TimeInMilliseconds() - capture_time_ms - 1),
+ pacer_->OldestPacketWaitTime());
+
+ // Send second packet and queue becomes empty.
+ EXPECT_CALL(callback, TimeToSendPacket(ssrc, sequence_number + 1,
+ capture_time_ms + 1, false, _))
+ .WillOnce(Return(RtpPacketSendResult::kSuccess));
+ clock_.AdvanceTimeMilliseconds(10000);
+ pacer_->ProcessPackets();
+ EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
+}
+
+TEST_P(PacingControllerTest, ExpectedQueueTimeMs) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ const size_t kNumPackets = 60;
+ const size_t kPacketSize = 1200;
+ const int32_t kMaxBitrate = kPaceMultiplier * 30000;
+ EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
+
+ pacer_->SetPacingRates(DataRate::bps(30000 * kPaceMultiplier),
+ DataRate::Zero());
+ for (size_t i = 0; i < kNumPackets; ++i) {
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ }
+
+ // Queue in ms = 1000 * (bytes in queue) *8 / (bits per second)
+ TimeDelta queue_time =
+ TimeDelta::ms(1000 * kNumPackets * kPacketSize * 8 / kMaxBitrate);
+ EXPECT_EQ(queue_time, pacer_->ExpectedQueueTime());
+
+ const Timestamp time_start = clock_.CurrentTime();
+ while (pacer_->QueueSizePackets() > 0) {
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+ }
+ TimeDelta duration = clock_.CurrentTime() - time_start;
+
+ EXPECT_EQ(TimeDelta::Zero(), pacer_->ExpectedQueueTime());
+
+ // Allow for aliasing, duration should be within one pack of max time limit.
+ const TimeDelta deviation =
+ duration - PacingController::kMaxExpectedQueueLength;
+ EXPECT_LT(deviation.Abs(),
+ TimeDelta::ms(1000 * kPacketSize * 8 / kMaxBitrate));
+}
+
+TEST_P(PacingControllerTest, QueueTimeGrowsOverTime) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
+
+ pacer_->SetPacingRates(DataRate::bps(30000 * kPaceMultiplier),
+ DataRate::Zero());
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number,
+ clock_.TimeInMilliseconds(), 1200);
+
+ clock_.AdvanceTimeMilliseconds(500);
+ EXPECT_EQ(TimeDelta::ms(500), pacer_->OldestPacketWaitTime());
+ pacer_->ProcessPackets();
+ EXPECT_EQ(TimeDelta::Zero(), pacer_->OldestPacketWaitTime());
+}
+
+TEST_P(PacingControllerTest, ProbingWithInsertedPackets) {
+ const size_t kPacketSize = 1200;
+ const int kInitialBitrateBps = 300000;
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+
+ PacingControllerProbing packet_sender;
+ pacer_ = absl::make_unique<PacingController>(&clock_, &packet_sender, nullptr,
+ nullptr);
+ pacer_->CreateProbeCluster(kFirstClusterRate,
+ /*cluster_id=*/0);
+ pacer_->CreateProbeCluster(kSecondClusterRate,
+ /*cluster_id=*/1);
+ pacer_->SetPacingRates(DataRate::bps(kInitialBitrateBps * kPaceMultiplier),
+ DataRate::Zero());
+
+ for (int i = 0; i < 10; ++i) {
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ }
+
+ int64_t start = clock_.TimeInMilliseconds();
+ while (packet_sender.packets_sent() < 5) {
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+ }
+ int packets_sent = packet_sender.packets_sent();
+ // Validate first cluster bitrate. Note that we have to account for number
+ // of intervals and hence (packets_sent - 1) on the first cluster.
+ EXPECT_NEAR((packets_sent - 1) * kPacketSize * 8000 /
+ (clock_.TimeInMilliseconds() - start),
+ kFirstClusterRate.bps(), kProbingErrorMargin.bps());
+ EXPECT_EQ(0, packet_sender.padding_sent());
+
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ start = clock_.TimeInMilliseconds();
+ while (packet_sender.packets_sent() < 10) {
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+ }
+ packets_sent = packet_sender.packets_sent() - packets_sent;
+ // Validate second cluster bitrate.
+ EXPECT_NEAR((packets_sent - 1) * kPacketSize * 8000 /
+ (clock_.TimeInMilliseconds() - start),
+ kSecondClusterRate.bps(), kProbingErrorMargin.bps());
+}
+
+TEST_P(PacingControllerTest, ProbingWithPaddingSupport) {
+ const size_t kPacketSize = 1200;
+ const int kInitialBitrateBps = 300000;
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+
+ PacingControllerProbing packet_sender;
+ pacer_ = absl::make_unique<PacingController>(&clock_, &packet_sender, nullptr,
+ nullptr);
+ pacer_->CreateProbeCluster(kFirstClusterRate,
+ /*cluster_id=*/0);
+ pacer_->SetPacingRates(DataRate::bps(kInitialBitrateBps * kPaceMultiplier),
+ DataRate::Zero());
+
+ for (int i = 0; i < 3; ++i) {
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ }
+
+ int64_t start = clock_.TimeInMilliseconds();
+ int process_count = 0;
+ while (process_count < 5) {
+ clock_.AdvanceTime(TimeUntilNextProcess());
+ pacer_->ProcessPackets();
+ ++process_count;
+ }
+ int packets_sent = packet_sender.packets_sent();
+ int padding_sent = packet_sender.padding_sent();
+ EXPECT_GT(packets_sent, 0);
+ EXPECT_GT(padding_sent, 0);
+ // Note that the number of intervals here for kPacketSize is
+ // packets_sent due to padding in the same cluster.
+ EXPECT_NEAR((packets_sent * kPacketSize * 8000 + padding_sent) /
+ (clock_.TimeInMilliseconds() - start),
+ kFirstClusterRate.bps(), kProbingErrorMargin.bps());
+}
+
+TEST_P(PacingControllerTest, PaddingOveruse) {
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ const size_t kPacketSize = 1200;
+
+ pacer_->ProcessPackets();
+ pacer_->SetPacingRates(DataRate::bps(60000 * kPaceMultiplier),
+ DataRate::Zero());
+
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ pacer_->ProcessPackets();
+
+ // Add 30kbit padding. When increasing budget, media budget will increase from
+ // negative (overuse) while padding budget will increase from 0.
+ clock_.AdvanceTimeMilliseconds(5);
+ pacer_->SetPacingRates(DataRate::bps(60000 * kPaceMultiplier),
+ DataRate::bps(30000));
+
+ SendAndExpectPacket(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ EXPECT_LT(TimeDelta::ms(5), pacer_->ExpectedQueueTime());
+ // Don't send padding if queue is non-empty, even if padding budget > 0.
+ EXPECT_CALL(callback_, SendPadding).Times(0);
+ pacer_->ProcessPackets();
+}
+
+TEST_P(PacingControllerTest, ProbeClusterId) {
+ MockPacketSender callback;
+
+ pacer_ =
+ absl::make_unique<PacingController>(&clock_, &callback, nullptr, nullptr);
+ Init();
+
+ uint32_t ssrc = 12346;
+ uint16_t sequence_number = 1234;
+ const size_t kPacketSize = 1200;
+
+ pacer_->SetPacingRates(kTargetRate * kPaceMultiplier, kTargetRate);
+ pacer_->SetProbingEnabled(true);
+ for (int i = 0; i < 10; ++i) {
+ Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), kPacketSize);
+ }
+
+ // First probing cluster.
+ if (GetParam() == PacerMode::kReferencePackets) {
+ EXPECT_CALL(callback,
+ TimeToSendPacket(_, _, _, _,
+ Field(&PacedPacketInfo::probe_cluster_id, 0)))
+ .Times(5)
+ .WillRepeatedly(Return(RtpPacketSendResult::kSuccess));
+ } else {
+ EXPECT_CALL(callback,
+ SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 0)))
+ .Times(5);
+ }
+
+ for (int i = 0; i < 5; ++i) {
+ clock_.AdvanceTimeMilliseconds(20);
+ pacer_->ProcessPackets();
+ }
+
+ // Second probing cluster.
+ if (GetParam() == PacerMode::kReferencePackets) {
+ EXPECT_CALL(callback,
+ TimeToSendPacket(_, _, _, _,
+ Field(&PacedPacketInfo::probe_cluster_id, 1)))
+ .Times(5)
+ .WillRepeatedly(Return(RtpPacketSendResult::kSuccess));
+ EXPECT_CALL(callback, TimeToSendPadding).WillOnce(Return(DataSize::Zero()));
+ } else {
+ EXPECT_CALL(callback,
+ SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, 1)))
+ .Times(5);
+ }
+
+ for (int i = 0; i < 5; ++i) {
+ clock_.AdvanceTimeMilliseconds(20);
+ pacer_->ProcessPackets();
+ }
+
+ // Needed for the Field comparer below.
+ const int kNotAProbe = PacedPacketInfo::kNotAProbe;
+ // No more probing packets.
+ if (GetParam() == PacerMode::kReferencePackets) {
+ EXPECT_CALL(callback,
+ TimeToSendPadding(
+ _, Field(&PacedPacketInfo::probe_cluster_id, kNotAProbe)))
+ .WillOnce(Return(DataSize::bytes(500)));
+ } else {
+ EXPECT_CALL(callback, GeneratePadding).WillOnce([&](DataSize padding_size) {
+ std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets;
+ padding_packets.emplace_back(
+ BuildPacket(RtpPacketToSend::Type::kPadding, ssrc, sequence_number++,
+ clock_.TimeInMilliseconds(), padding_size.bytes()));
+ return padding_packets;
+ });
+ EXPECT_CALL(
+ callback,
+ SendRtpPacket(_, Field(&PacedPacketInfo::probe_cluster_id, kNotAProbe)))
+ .Times(1);
+ }
+ pacer_->ProcessPackets();
+}
+
+TEST_P(PacingControllerTest, OwnedPacketPrioritizedOnType) {
+ if (GetParam() != PacerMode::kOwnPackets) {
+ // This test only makes sense when using the new code path.
+ return;
+ }
+
+ MockPacketSender callback;
+ pacer_ =
+ absl::make_unique<PacingController>(&clock_, &callback, nullptr, nullptr);
+ Init();
+
+ // Insert a packet of each type, from low to high priority. Since priority
+ // is weighted higher than insert order, these should come out of the pacer
+ // in backwards order with the exception of FEC and Video.
+ for (RtpPacketToSend::Type type :
+ {RtpPacketToSend::Type::kPadding,
+ RtpPacketToSend::Type::kForwardErrorCorrection,
+ RtpPacketToSend::Type::kVideo, RtpPacketToSend::Type::kRetransmission,
+ RtpPacketToSend::Type::kAudio}) {
+ pacer_->EnqueuePacket(BuildRtpPacket(type));
+ }
+
+ ::testing::InSequence seq;
+ EXPECT_CALL(
+ callback,
+ SendRtpPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kAudioSsrc)), _));
+ EXPECT_CALL(callback,
+ SendRtpPacket(
+ Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
+
+ // FEC and video actually have the same priority, so will come out in
+ // insertion order.
+ EXPECT_CALL(callback,
+ SendRtpPacket(
+ Pointee(Property(&RtpPacketToSend::Ssrc, kFlexFecSsrc)), _));
+ EXPECT_CALL(
+ callback,
+ SendRtpPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoSsrc)), _));
+
+ EXPECT_CALL(callback,
+ SendRtpPacket(
+ Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
+
+ clock_.AdvanceTimeMilliseconds(200);
+ pacer_->ProcessPackets();
+}
+
+INSTANTIATE_TEST_SUITE_P(ReferencingAndOwningPackets,
+ PacingControllerTest,
+ ::testing::Values(PacerMode::kReferencePackets,
+ PacerMode::kOwnPackets));
+
+} // namespace test
+} // namespace webrtc