Merges RtpTransportControllerSend with SendSideCongestionController.
Bug: webrtc:9586
Change-Id: I50332f2e128f107e40af7776be0ed530e20774d9
Reviewed-on: https://webrtc-review.googlesource.com/c/113183
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#25922}
diff --git a/call/BUILD.gn b/call/BUILD.gn
index fea81b5..4d7ccca 100644
--- a/call/BUILD.gn
+++ b/call/BUILD.gn
@@ -123,6 +123,7 @@
"..:webrtc_common",
"../api:fec_controller_api",
"../api:transport_api",
+ "../api/transport:goog_cc",
"../api/transport:network_control",
"../api/units:data_rate",
"../api/units:time_delta",
@@ -131,7 +132,8 @@
"../api/video_codecs:video_codecs_api",
"../logging:rtc_event_log_api",
"../modules/congestion_controller",
- "../modules/congestion_controller/rtp:congestion_controller",
+ "../modules/congestion_controller/rtp:control_handler",
+ "../modules/congestion_controller/rtp:transport_feedback",
"../modules/pacing",
"../modules/rtp_rtcp:rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index ac0f8de..cb28eca 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -12,14 +12,13 @@
#include "absl/memory/memory.h"
#include "absl/types/optional.h"
+#include "api/transport/goog_cc_factory.h"
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "call/rtp_transport_controller_send.h"
#include "call/rtp_video_sender.h"
-#include "modules/congestion_controller/include/send_side_congestion_controller.h"
-#include "modules/congestion_controller/rtp/include/send_side_congestion_controller.h"
#include "rtc_base/checks.h"
#include "rtc_base/location.h"
#include "rtc_base/logging.h"
@@ -27,35 +26,87 @@
#include "system_wrappers/include/field_trial.h"
namespace webrtc {
+class RtpTransportControllerSend::PeriodicTask : public rtc::QueuedTask {
+ public:
+ virtual void Stop() = 0;
+};
+
namespace {
static const int64_t kRetransmitWindowSizeMs = 500;
static const size_t kMaxOverheadBytes = 500;
-const char kTaskQueueExperiment[] = "WebRTC-TaskQueueCongestionControl";
-using TaskQueueController = webrtc::webrtc_cc::SendSideCongestionController;
-std::unique_ptr<SendSideCongestionControllerInterface> CreateController(
- Clock* clock,
- rtc::TaskQueue* task_queue,
- webrtc::RtcEventLog* event_log,
- PacedSender* pacer,
- const BitrateConstraints& bitrate_config,
- bool task_queue_controller,
- NetworkControllerFactoryInterface* controller_factory) {
- if (task_queue_controller) {
- RTC_LOG(LS_INFO) << "Using TaskQueue based SSCC";
- return absl::make_unique<webrtc::webrtc_cc::SendSideCongestionController>(
- clock, task_queue, event_log, pacer, bitrate_config.start_bitrate_bps,
- bitrate_config.min_bitrate_bps, bitrate_config.max_bitrate_bps,
- controller_factory);
+const int64_t PacerQueueUpdateIntervalMs = 25;
+
+TargetRateConstraints ConvertConstraints(int min_bitrate_bps,
+ int max_bitrate_bps,
+ int start_bitrate_bps,
+ const Clock* clock) {
+ TargetRateConstraints msg;
+ msg.at_time = Timestamp::ms(clock->TimeInMilliseconds());
+ msg.min_data_rate =
+ min_bitrate_bps >= 0 ? DataRate::bps(min_bitrate_bps) : DataRate::Zero();
+ msg.max_data_rate = max_bitrate_bps > 0 ? DataRate::bps(max_bitrate_bps)
+ : DataRate::Infinity();
+ if (start_bitrate_bps > 0)
+ msg.starting_rate = DataRate::bps(start_bitrate_bps);
+ return msg;
+}
+
+TargetRateConstraints ConvertConstraints(const BitrateConstraints& contraints,
+ const Clock* clock) {
+ return ConvertConstraints(contraints.min_bitrate_bps,
+ contraints.max_bitrate_bps,
+ contraints.start_bitrate_bps, clock);
+}
+
+// The template closure pattern is based on rtc::ClosureTask.
+template <class Closure>
+class PeriodicTaskImpl final : public RtpTransportControllerSend::PeriodicTask {
+ public:
+ PeriodicTaskImpl(rtc::TaskQueue* task_queue,
+ int64_t period_ms,
+ Closure&& closure)
+ : task_queue_(task_queue),
+ period_ms_(period_ms),
+ closure_(std::forward<Closure>(closure)) {}
+ bool Run() override {
+ if (!running_)
+ return true;
+ closure_();
+ // absl::WrapUnique lets us repost this task on the TaskQueue.
+ task_queue_->PostDelayedTask(absl::WrapUnique(this), period_ms_);
+ // Return false to tell TaskQueue to not destruct this object, since we have
+ // taken ownership with absl::WrapUnique.
+ return false;
}
- RTC_LOG(LS_INFO) << "Using Legacy SSCC";
- auto cc = absl::make_unique<webrtc::SendSideCongestionController>(
- clock, nullptr /* observer */, event_log, pacer);
- cc->SignalNetworkState(kNetworkDown);
- cc->SetBweBitrates(bitrate_config.min_bitrate_bps,
- bitrate_config.start_bitrate_bps,
- bitrate_config.max_bitrate_bps);
- return std::move(cc);
+ void Stop() override {
+ if (task_queue_->IsCurrent()) {
+ RTC_DCHECK(running_);
+ running_ = false;
+ } else {
+ task_queue_->PostTask([this] { Stop(); });
+ }
+ }
+
+ private:
+ rtc::TaskQueue* const task_queue_;
+ const int64_t period_ms_;
+ typename std::remove_const<
+ typename std::remove_reference<Closure>::type>::type closure_;
+ bool running_ = true;
+};
+
+template <class Closure>
+static RtpTransportControllerSend::PeriodicTask* StartPeriodicTask(
+ rtc::TaskQueue* task_queue,
+ int64_t period_ms,
+ Closure&& closure) {
+ auto periodic_task = absl::make_unique<PeriodicTaskImpl<Closure>>(
+ task_queue, period_ms, std::forward<Closure>(closure));
+ RtpTransportControllerSend::PeriodicTask* periodic_task_ptr =
+ periodic_task.get();
+ task_queue->PostDelayedTask(std::move(periodic_task), period_ms);
+ return periodic_task_ptr;
}
} // namespace
@@ -69,21 +120,35 @@
bitrate_configurator_(bitrate_config),
process_thread_(ProcessThread::Create("SendControllerThread")),
observer_(nullptr),
+ transport_feedback_adapter_(clock_),
+ controller_factory_override_(controller_factory),
+ controller_factory_fallback_(
+ absl::make_unique<GoogCcNetworkControllerFactory>(event_log)),
+ process_interval_(controller_factory_fallback_->GetProcessInterval()),
+ last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())),
+ reset_feedback_on_route_change_(
+ !field_trial::IsEnabled("WebRTC-Bwe-NoFeedbackReset")),
+ send_side_bwe_with_overhead_(
+ webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")),
+ transport_overhead_bytes_per_packet_(0),
+ network_available_(false),
+ periodic_tasks_enabled_(true),
+ packet_feedback_available_(false),
+ pacer_queue_update_task_(nullptr),
+ controller_task_(nullptr),
retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
task_queue_("rtp_send_controller") {
- // Created after task_queue to be able to post to the task queue internally.
- send_side_cc_ = CreateController(
- clock, &task_queue_, event_log, &pacer_, bitrate_config,
- !field_trial::IsDisabled(kTaskQueueExperiment), controller_factory);
+ initial_config_.constraints = ConvertConstraints(bitrate_config, clock_);
+ RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);
+
+ pacer_.SetEstimatedBitrate(bitrate_config.start_bitrate_bps);
process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE);
- process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE);
process_thread_->Start();
}
RtpTransportControllerSend::~RtpTransportControllerSend() {
process_thread_->Stop();
- process_thread_->DeRegisterModule(send_side_cc_.get());
process_thread_->DeRegisterModule(&pacer_);
}
@@ -126,34 +191,21 @@
uint8_t fraction_loss,
int64_t rtt_ms,
int64_t probing_interval_ms) {
- // TODO(srte): Skip this step when old SendSideCongestionController is
- // deprecated.
TargetTransferRate msg;
msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
msg.target_rate = DataRate::bps(bitrate_bps);
- msg.network_estimate.at_time = msg.at_time;
- msg.network_estimate.bwe_period = TimeDelta::ms(probing_interval_ms);
- uint32_t bandwidth_bps;
- if (send_side_cc_->AvailableBandwidth(&bandwidth_bps))
- msg.network_estimate.bandwidth = DataRate::bps(bandwidth_bps);
- msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0;
- msg.network_estimate.round_trip_time = TimeDelta::ms(rtt_ms);
+ // TODO(srte): Remove this interface and push information about bandwidth
+ // estimation to users of this class, thereby reducing synchronous calls.
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ RTC_DCHECK(control_handler_->last_transfer_rate().has_value());
+ msg.network_estimate =
+ control_handler_->last_transfer_rate()->network_estimate;
- retransmission_rate_limiter_.SetMaxRate(bandwidth_bps);
+ retransmission_rate_limiter_.SetMaxRate(msg.network_estimate.bandwidth.bps());
- if (!task_queue_.IsCurrent()) {
- task_queue_.PostTask([this, msg] {
- rtc::CritScope cs(&observer_crit_);
- // We won't register as observer until we have an observers.
- RTC_DCHECK(observer_ != nullptr);
- observer_->OnTargetTransferRate(msg);
- });
- } else {
- rtc::CritScope cs(&observer_crit_);
- // We won't register as observer until we have an observers.
- RTC_DCHECK(observer_ != nullptr);
- observer_->OnTargetTransferRate(msg);
- }
+ // We won't register as observer until we have an observers.
+ RTC_DCHECK(observer_ != nullptr);
+ observer_->OnTargetTransferRate(msg);
}
rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
@@ -166,7 +218,7 @@
TransportFeedbackObserver*
RtpTransportControllerSend::transport_feedback_observer() {
- return send_side_cc_.get();
+ return this;
}
RtpPacketSender* RtpTransportControllerSend::packet_sender() {
@@ -181,8 +233,12 @@
int min_send_bitrate_bps,
int max_padding_bitrate_bps,
int max_total_bitrate_bps) {
- send_side_cc_->SetAllocatedSendBitrateLimits(
- min_send_bitrate_bps, max_padding_bitrate_bps, max_total_bitrate_bps);
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps);
+ streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps);
+ streams_config_.max_total_allocated_bitrate =
+ DataRate::bps(max_total_bitrate_bps);
+ UpdateStreamsConfig();
}
void RtpTransportControllerSend::SetKeepAliveConfig(
@@ -190,31 +246,33 @@
keepalive_ = config;
}
void RtpTransportControllerSend::SetPacingFactor(float pacing_factor) {
- send_side_cc_->SetPacingFactor(pacing_factor);
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ streams_config_.pacing_factor = pacing_factor;
+ UpdateStreamsConfig();
}
void RtpTransportControllerSend::SetQueueTimeLimit(int limit_ms) {
pacer_.SetQueueTimeLimit(limit_ms);
}
CallStatsObserver* RtpTransportControllerSend::GetCallStatsObserver() {
- return send_side_cc_.get();
+ return this;
}
void RtpTransportControllerSend::RegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {
- send_side_cc_->RegisterPacketFeedbackObserver(observer);
+ transport_feedback_adapter_.RegisterPacketFeedbackObserver(observer);
}
void RtpTransportControllerSend::DeRegisterPacketFeedbackObserver(
PacketFeedbackObserver* observer) {
- send_side_cc_->DeRegisterPacketFeedbackObserver(observer);
+ transport_feedback_adapter_.DeRegisterPacketFeedbackObserver(observer);
}
void RtpTransportControllerSend::RegisterTargetTransferRateObserver(
TargetTransferRateObserver* observer) {
- {
- rtc::CritScope cs(&observer_crit_);
+ task_queue_.PostTask([this, observer] {
+ RTC_DCHECK_RUN_ON(&task_queue_);
RTC_DCHECK(observer_ == nullptr);
observer_ = observer;
- }
- send_side_cc_->RegisterNetworkObserver(this);
+ MaybeCreateControllers();
+ });
}
void RtpTransportControllerSend::OnNetworkRouteChanged(
const std::string& transport_name,
@@ -252,20 +310,49 @@
<< " bps, max: " << bitrate_config.max_bitrate_bps
<< " bps.";
RTC_DCHECK_GT(bitrate_config.start_bitrate_bps, 0);
- send_side_cc_->OnNetworkRouteChanged(
- network_route, bitrate_config.start_bitrate_bps,
- bitrate_config.min_bitrate_bps, bitrate_config.max_bitrate_bps);
+
+ if (reset_feedback_on_route_change_)
+ transport_feedback_adapter_.SetNetworkIds(
+ network_route.local_network_id, network_route.remote_network_id);
+ transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
+
+ NetworkRouteChange msg;
+ msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
+ msg.constraints = ConvertConstraints(bitrate_config, clock_);
+ task_queue_.PostTask([this, msg] {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ if (controller_) {
+ control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg));
+ } else {
+ UpdateInitialConstraints(msg.constraints);
+ }
+ pacer_.UpdateOutstandingData(0);
+ });
}
}
void RtpTransportControllerSend::OnNetworkAvailability(bool network_available) {
- send_side_cc_->SignalNetworkState(network_available ? kNetworkUp
- : kNetworkDown);
+ RTC_LOG(LS_INFO) << "SignalNetworkState "
+ << (network_available ? "Up" : "Down");
+ NetworkAvailability msg;
+ msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
+ msg.network_available = network_available;
+ task_queue_.PostTask([this, msg]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ network_available_ = msg.network_available;
+ if (controller_) {
+ control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg));
+ control_handler_->OnNetworkAvailability(msg);
+ } else {
+ MaybeCreateControllers();
+ }
+ });
+
for (auto& rtp_sender : video_rtp_senders_) {
rtp_sender->OnNetworkAvailability(network_available);
}
}
RtcpBandwidthObserver* RtpTransportControllerSend::GetBandwidthObserver() {
- return send_side_cc_->GetBandwidthObserver();
+ return this;
}
int64_t RtpTransportControllerSend::GetPacerQueuingDelayMs() const {
return pacer_.QueueInMs();
@@ -274,14 +361,30 @@
return pacer_.FirstSentPacketTimeMs();
}
void RtpTransportControllerSend::SetPerPacketFeedbackAvailable(bool available) {
- send_side_cc_->SetPerPacketFeedbackAvailable(available);
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ packet_feedback_available_ = available;
+ if (!controller_)
+ MaybeCreateControllers();
}
void RtpTransportControllerSend::EnablePeriodicAlrProbing(bool enable) {
- send_side_cc_->EnablePeriodicAlrProbing(enable);
+ task_queue_.PostTask([this, enable]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ streams_config_.requests_alr_probing = enable;
+ UpdateStreamsConfig();
+ });
}
void RtpTransportControllerSend::OnSentPacket(
const rtc::SentPacket& sent_packet) {
- send_side_cc_->OnSentPacket(sent_packet);
+ absl::optional<SentPacket> packet_msg =
+ transport_feedback_adapter_.ProcessSentPacket(sent_packet);
+ if (packet_msg) {
+ task_queue_.PostTask([this, packet_msg]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ if (controller_)
+ control_handler_->PostUpdates(controller_->OnSentPacket(*packet_msg));
+ });
+ }
+ MaybeUpdateOutstandingData();
}
void RtpTransportControllerSend::SetSdpBitrateParameters(
@@ -289,9 +392,16 @@
absl::optional<BitrateConstraints> updated =
bitrate_configurator_.UpdateWithSdpParameters(constraints);
if (updated.has_value()) {
- send_side_cc_->SetBweBitrates(updated->min_bitrate_bps,
- updated->start_bitrate_bps,
- updated->max_bitrate_bps);
+ TargetRateConstraints msg = ConvertConstraints(*updated, clock_);
+ task_queue_.PostTask([this, msg]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ if (controller_) {
+ control_handler_->PostUpdates(
+ controller_->OnTargetRateConstraints(msg));
+ } else {
+ UpdateInitialConstraints(msg);
+ }
+ });
} else {
RTC_LOG(LS_VERBOSE)
<< "WebRTC.RtpTransportControllerSend.SetSdpBitrateParameters: "
@@ -304,9 +414,16 @@
absl::optional<BitrateConstraints> updated =
bitrate_configurator_.UpdateWithClientPreferences(preferences);
if (updated.has_value()) {
- send_side_cc_->SetBweBitrates(updated->min_bitrate_bps,
- updated->start_bitrate_bps,
- updated->max_bitrate_bps);
+ TargetRateConstraints msg = ConvertConstraints(*updated, clock_);
+ task_queue_.PostTask([this, msg]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ if (controller_) {
+ control_handler_->PostUpdates(
+ controller_->OnTargetRateConstraints(msg));
+ } else {
+ UpdateInitialConstraints(msg);
+ }
+ });
} else {
RTC_LOG(LS_VERBOSE)
<< "WebRTC.RtpTransportControllerSend.SetClientBitratePreferences: "
@@ -321,7 +438,12 @@
if (field_trial::IsEnabled("WebRTC-Audio-ABWENoTWCC")) {
// TODO(srte): Make sure it's safe to always report this and remove the
// field trial check.
- send_side_cc_->SetAllocatedBitrateWithoutFeedback(bitrate_bps);
+ task_queue_.PostTask([this, bitrate_bps]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ streams_config_.unacknowledged_rate_allocation =
+ DataRate::bps(bitrate_bps);
+ UpdateStreamsConfig();
+ });
}
}
@@ -339,4 +461,215 @@
transport_overhead_bytes_per_packet);
}
}
+
+void RtpTransportControllerSend::OnReceivedEstimatedBitrate(uint32_t bitrate) {
+ RemoteBitrateReport msg;
+ msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds());
+ msg.bandwidth = DataRate::bps(bitrate);
+ task_queue_.PostTask([this, msg]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ if (controller_)
+ control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg));
+ });
+}
+
+void RtpTransportControllerSend::OnReceivedRtcpReceiverReport(
+ const ReportBlockList& report_blocks,
+ int64_t rtt_ms,
+ int64_t now_ms) {
+ task_queue_.PostTask([this, report_blocks, now_ms]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
+ });
+
+ task_queue_.PostTask([this, now_ms, rtt_ms]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ RoundTripTimeUpdate report;
+ report.receive_time = Timestamp::ms(now_ms);
+ report.round_trip_time = TimeDelta::ms(rtt_ms);
+ report.smoothed = false;
+ if (controller_)
+ control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
+ });
+}
+
+void RtpTransportControllerSend::AddPacket(uint32_t ssrc,
+ uint16_t sequence_number,
+ size_t length,
+ const PacedPacketInfo& pacing_info) {
+ if (send_side_bwe_with_overhead_) {
+ length += transport_overhead_bytes_per_packet_;
+ }
+ transport_feedback_adapter_.AddPacket(ssrc, sequence_number, length,
+ pacing_info);
+}
+
+void RtpTransportControllerSend::OnTransportFeedback(
+ const rtcp::TransportFeedback& feedback) {
+ RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
+
+ absl::optional<TransportPacketsFeedback> feedback_msg =
+ transport_feedback_adapter_.ProcessTransportFeedback(feedback);
+ if (feedback_msg) {
+ task_queue_.PostTask([this, feedback_msg]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ if (controller_)
+ control_handler_->PostUpdates(
+ controller_->OnTransportPacketsFeedback(*feedback_msg));
+ });
+ }
+ MaybeUpdateOutstandingData();
+}
+
+void RtpTransportControllerSend::OnRttUpdate(int64_t avg_rtt_ms,
+ int64_t max_rtt_ms) {
+ int64_t now_ms = clock_->TimeInMilliseconds();
+ RoundTripTimeUpdate report;
+ report.receive_time = Timestamp::ms(now_ms);
+ report.round_trip_time = TimeDelta::ms(avg_rtt_ms);
+ report.smoothed = true;
+ task_queue_.PostTask([this, report]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ if (controller_)
+ control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
+ });
+}
+
+void RtpTransportControllerSend::MaybeCreateControllers() {
+ RTC_DCHECK(!controller_);
+ RTC_DCHECK(!control_handler_);
+
+ if (!network_available_ || !observer_)
+ return;
+ control_handler_ = absl::make_unique<CongestionControlHandler>(this, &pacer_);
+
+ initial_config_.constraints.at_time =
+ Timestamp::ms(clock_->TimeInMilliseconds());
+ initial_config_.stream_based_config = streams_config_;
+
+ // TODO(srte): Use fallback controller if no feedback is available.
+ if (controller_factory_override_) {
+ RTC_LOG(LS_INFO) << "Creating overridden congestion controller";
+ controller_ = controller_factory_override_->Create(initial_config_);
+ process_interval_ = controller_factory_override_->GetProcessInterval();
+ } else {
+ RTC_LOG(LS_INFO) << "Creating fallback congestion controller";
+ controller_ = controller_factory_fallback_->Create(initial_config_);
+ process_interval_ = controller_factory_fallback_->GetProcessInterval();
+ }
+ UpdateControllerWithTimeInterval();
+ StartProcessPeriodicTasks();
+}
+
+void RtpTransportControllerSend::UpdateInitialConstraints(
+ TargetRateConstraints new_contraints) {
+ if (!new_contraints.starting_rate)
+ new_contraints.starting_rate = initial_config_.constraints.starting_rate;
+ RTC_DCHECK(new_contraints.starting_rate);
+ initial_config_.constraints = new_contraints;
+}
+
+void RtpTransportControllerSend::StartProcessPeriodicTasks() {
+ if (!periodic_tasks_enabled_)
+ return;
+ if (!pacer_queue_update_task_) {
+ pacer_queue_update_task_ =
+ StartPeriodicTask(&task_queue_, PacerQueueUpdateIntervalMs, [this]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ UpdatePacerQueue();
+ });
+ }
+ if (controller_task_) {
+ // Stop is not synchronous, but is guaranteed to occur before the first
+ // invocation of the new controller task started below.
+ controller_task_->Stop();
+ controller_task_ = nullptr;
+ }
+ if (process_interval_.IsFinite()) {
+ // The controller task is owned by the task queue and lives until the task
+ // queue is destroyed or some time after Stop() is called, whichever comes
+ // first.
+ controller_task_ =
+ StartPeriodicTask(&task_queue_, process_interval_.ms(), [this]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ UpdateControllerWithTimeInterval();
+ });
+ }
+}
+
+void RtpTransportControllerSend::UpdateControllerWithTimeInterval() {
+ if (controller_) {
+ ProcessInterval msg;
+ msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
+ control_handler_->PostUpdates(controller_->OnProcessInterval(msg));
+ }
+}
+
+void RtpTransportControllerSend::UpdatePacerQueue() {
+ if (control_handler_) {
+ TimeDelta expected_queue_time = TimeDelta::ms(pacer_.ExpectedQueueTimeMs());
+ control_handler_->OnPacerQueueUpdate(expected_queue_time);
+ }
+}
+
+void RtpTransportControllerSend::MaybeUpdateOutstandingData() {
+ DataSize in_flight_data = transport_feedback_adapter_.GetOutstandingData();
+ task_queue_.PostTask([this, in_flight_data]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ if (control_handler_)
+ control_handler_->OnOutstandingData(in_flight_data);
+ });
+}
+
+void RtpTransportControllerSend::UpdateStreamsConfig() {
+ streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
+ if (controller_)
+ control_handler_->PostUpdates(
+ controller_->OnStreamsConfig(streams_config_));
+}
+
+void RtpTransportControllerSend::OnReceivedRtcpReceiverReportBlocks(
+ const ReportBlockList& report_blocks,
+ int64_t now_ms) {
+ if (report_blocks.empty())
+ return;
+
+ int total_packets_lost_delta = 0;
+ int total_packets_delta = 0;
+
+ // Compute the packet loss from all report blocks.
+ for (const RTCPReportBlock& report_block : report_blocks) {
+ auto it = last_report_blocks_.find(report_block.source_ssrc);
+ if (it != last_report_blocks_.end()) {
+ auto number_of_packets = report_block.extended_highest_sequence_number -
+ it->second.extended_highest_sequence_number;
+ total_packets_delta += number_of_packets;
+ auto lost_delta = report_block.packets_lost - it->second.packets_lost;
+ total_packets_lost_delta += lost_delta;
+ }
+ last_report_blocks_[report_block.source_ssrc] = report_block;
+ }
+ // Can only compute delta if there has been previous blocks to compare to. If
+ // not, total_packets_delta will be unchanged and there's nothing more to do.
+ if (!total_packets_delta)
+ return;
+ int packets_received_delta = total_packets_delta - total_packets_lost_delta;
+ // To detect lost packets, at least one packet has to be received. This check
+ // is needed to avoid bandwith detection update in
+ // VideoSendStreamTest.SuspendBelowMinBitrate
+
+ if (packets_received_delta < 1)
+ return;
+ Timestamp now = Timestamp::ms(now_ms);
+ TransportLossReport msg;
+ msg.packets_lost_delta = total_packets_lost_delta;
+ msg.packets_received_delta = packets_received_delta;
+ msg.receive_time = now;
+ msg.start_time = last_report_block_time_;
+ msg.end_time = now;
+ if (controller_)
+ control_handler_->PostUpdates(controller_->OnTransportLossReport(msg));
+ last_report_block_time_ = now;
+}
+
} // namespace webrtc
diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h
index f1c46cc..8406ca4 100644
--- a/call/rtp_transport_controller_send.h
+++ b/call/rtp_transport_controller_send.h
@@ -11,6 +11,7 @@
#ifndef CALL_RTP_TRANSPORT_CONTROLLER_SEND_H_
#define CALL_RTP_TRANSPORT_CONTROLLER_SEND_H_
+#include <atomic>
#include <map>
#include <memory>
#include <string>
@@ -20,15 +21,17 @@
#include "call/rtp_bitrate_configurator.h"
#include "call/rtp_transport_controller_send_interface.h"
#include "call/rtp_video_sender.h"
-#include "modules/congestion_controller/include/send_side_congestion_controller_interface.h"
+#include "modules/congestion_controller/include/network_changed_observer.h"
+#include "modules/congestion_controller/rtp/control_handler.h"
+#include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
#include "modules/pacing/packet_router.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/constructormagic.h"
#include "rtc_base/networkroute.h"
+#include "rtc_base/race_checker.h"
#include "rtc_base/task_queue.h"
namespace webrtc {
-
class Clock;
class FrameEncryptorInterface;
class RtcEventLog;
@@ -38,7 +41,10 @@
// per transport, sharing the same congestion controller.
class RtpTransportControllerSend final
: public RtpTransportControllerSendInterface,
- public NetworkChangedObserver {
+ public NetworkChangedObserver,
+ public RtcpBandwidthObserver,
+ public CallStatsObserver,
+ public TransportFeedbackObserver {
public:
RtpTransportControllerSend(
Clock* clock,
@@ -108,7 +114,39 @@
void OnTransportOverheadChanged(
size_t transport_overhead_per_packet) override;
+ // Implements RtcpBandwidthObserver interface
+ void OnReceivedEstimatedBitrate(uint32_t bitrate) override;
+ void OnReceivedRtcpReceiverReport(const ReportBlockList& report_blocks,
+ int64_t rtt,
+ int64_t now_ms) override;
+
+ // Implements TransportFeedbackObserver interface
+ void AddPacket(uint32_t ssrc,
+ uint16_t sequence_number,
+ size_t length,
+ const PacedPacketInfo& pacing_info) override;
+ void OnTransportFeedback(const rtcp::TransportFeedback& feedback) override;
+
+ // Implements CallStatsObserver interface
+ void OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) override;
+
+ class PeriodicTask;
+
private:
+ void MaybeCreateControllers() RTC_RUN_ON(task_queue_);
+ void UpdateInitialConstraints(TargetRateConstraints new_contraints)
+ RTC_RUN_ON(task_queue_);
+
+ void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_);
+ void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_);
+ void UpdatePacerQueue() RTC_RUN_ON(task_queue_);
+
+ void UpdateStreamsConfig() RTC_RUN_ON(task_queue_);
+ void MaybeUpdateOutstandingData();
+ void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks,
+ int64_t now_ms)
+ RTC_RUN_ON(task_queue_);
+
const Clock* const clock_;
PacketRouter packet_router_;
std::vector<std::unique_ptr<RtpVideoSenderInterface>> video_rtp_senders_;
@@ -117,9 +155,49 @@
RtpBitrateConfigurator bitrate_configurator_;
std::map<std::string, rtc::NetworkRoute> network_routes_;
const std::unique_ptr<ProcessThread> process_thread_;
- rtc::CriticalSection observer_crit_;
- TargetTransferRateObserver* observer_ RTC_GUARDED_BY(observer_crit_);
- std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_;
+
+ TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);
+
+ // TODO(srte): Move all access to feedback adapter to task queue.
+ TransportFeedbackAdapter transport_feedback_adapter_;
+
+ NetworkControllerFactoryInterface* const controller_factory_override_
+ RTC_PT_GUARDED_BY(task_queue_);
+ const std::unique_ptr<NetworkControllerFactoryInterface>
+ controller_factory_fallback_ RTC_PT_GUARDED_BY(task_queue_);
+
+ std::unique_ptr<CongestionControlHandler> control_handler_
+ RTC_GUARDED_BY(task_queue_) RTC_PT_GUARDED_BY(task_queue_);
+
+ std::unique_ptr<NetworkControllerInterface> controller_
+ RTC_GUARDED_BY(task_queue_) RTC_PT_GUARDED_BY(task_queue_);
+
+ TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_);
+
+ std::map<uint32_t, RTCPReportBlock> last_report_blocks_
+ RTC_GUARDED_BY(task_queue_);
+ Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_);
+
+ NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_);
+ StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_);
+
+ const bool reset_feedback_on_route_change_;
+ const bool send_side_bwe_with_overhead_;
+ // Transport overhead is written by OnNetworkRouteChanged and read by
+ // AddPacket.
+ // TODO(srte): Remove atomic when feedback adapter runs on task queue.
+ std::atomic<size_t> transport_overhead_bytes_per_packet_;
+ bool network_available_ RTC_GUARDED_BY(task_queue_);
+ bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_);
+ bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_);
+ PeriodicTask* pacer_queue_update_task_ RTC_GUARDED_BY(task_queue_)
+ RTC_PT_GUARDED_BY(task_queue_);
+ PeriodicTask* controller_task_ RTC_GUARDED_BY(task_queue_)
+ RTC_PT_GUARDED_BY(task_queue_);
+ // Protects access to last_packet_feedback_vector_ in feedback adapter.
+ // TODO(srte): Remove this checker when feedback adapter runs on task queue.
+ rtc::RaceChecker worker_race_;
+
RateLimiter retransmission_rate_limiter_;
// TODO(perkj): |task_queue_| is supposed to replace |process_thread_|.
diff --git a/modules/congestion_controller/rtp/BUILD.gn b/modules/congestion_controller/rtp/BUILD.gn
index bd4e128..9bf78bc 100644
--- a/modules/congestion_controller/rtp/BUILD.gn
+++ b/modules/congestion_controller/rtp/BUILD.gn
@@ -16,47 +16,6 @@
}
}
-rtc_static_library("congestion_controller") {
- visibility = [ "*" ]
- configs += [ ":bwe_test_logging" ]
- sources = [
- "include/send_side_congestion_controller.h",
- "send_side_congestion_controller.cc",
- ]
-
- if (!build_with_chromium && is_clang) {
- # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163).
- suppressed_configs += [ "//build/config/clang:find_bad_constructs" ]
- }
-
- deps = [
- ":control_handler",
- ":transport_feedback",
- "../:congestion_controller",
- "../..:module_api",
- "../../..:webrtc_common",
- "../../../api/transport:goog_cc",
- "../../../api/transport:network_control",
- "../../../rtc_base:checks",
- "../../../rtc_base:rate_limiter",
- "../../../rtc_base:rtc_task_queue_api",
- "../../../rtc_base:safe_minmax",
- "../../../rtc_base:sequenced_task_checker",
- "../../../rtc_base/experiments:congestion_controller_experiment",
- "../../../rtc_base/network:sent_packet",
- "../../../system_wrappers",
- "../../../system_wrappers:field_trial",
- "../../pacing",
- "../../remote_bitrate_estimator",
- "../../rtp_rtcp:rtp_rtcp_format",
- "//third_party/abseil-cpp/absl/memory",
- ]
-
- if (!build_with_mozilla) {
- deps += [ "../../../rtc_base:rtc_base" ]
- }
-}
-
rtc_source_set("control_handler") {
visibility = [ "*" ]
sources = [
@@ -122,12 +81,10 @@
sources = [
"congestion_controller_unittests_helper.cc",
"congestion_controller_unittests_helper.h",
- "send_side_congestion_controller_unittest.cc",
"send_time_history_unittest.cc",
"transport_feedback_adapter_unittest.cc",
]
deps = [
- ":congestion_controller",
":transport_feedback",
"../:congestion_controller",
"../:mock_congestion_controller",
diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h
deleted file mode 100644
index 55465cb..0000000
--- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef MODULES_CONGESTION_CONTROLLER_RTP_INCLUDE_SEND_SIDE_CONGESTION_CONTROLLER_H_
-#define MODULES_CONGESTION_CONTROLLER_RTP_INCLUDE_SEND_SIDE_CONGESTION_CONTROLLER_H_
-
-#include <atomic>
-#include <functional>
-#include <map>
-#include <memory>
-#include <vector>
-
-#include "api/transport/network_control.h"
-#include "api/transport/network_types.h"
-#include "common_types.h" // NOLINT(build/include)
-#include "modules/congestion_controller/include/send_side_congestion_controller_interface.h"
-#include "modules/congestion_controller/rtp/control_handler.h"
-#include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
-#include "modules/include/module.h"
-#include "modules/include/module_common_types.h"
-#include "modules/pacing/paced_sender.h"
-#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
-#include "rtc_base/constructormagic.h"
-#include "rtc_base/criticalsection.h"
-#include "rtc_base/networkroute.h"
-#include "rtc_base/race_checker.h"
-#include "rtc_base/task_queue.h"
-
-namespace rtc {
-struct SentPacket;
-}
-
-namespace webrtc {
-
-class Clock;
-class RtcEventLog;
-
-namespace webrtc_cc {
-
-namespace send_side_cc_internal {
-
-// TODO(srte): Make sure the PeriodicTask implementation is reusable and move it
-// to task_queue.h.
-class PeriodicTask : public rtc::QueuedTask {
- public:
- virtual void Stop() = 0;
-};
-} // namespace send_side_cc_internal
-
-class SendSideCongestionController
- : public SendSideCongestionControllerInterface,
- public RtcpBandwidthObserver {
- public:
- SendSideCongestionController(
- const Clock* clock,
- rtc::TaskQueue* task_queue,
- RtcEventLog* event_log,
- PacedSender* pacer,
- int start_bitrate_bps,
- int min_bitrate_bps,
- int max_bitrate_bps,
- NetworkControllerFactoryInterface* controller_factory);
-
- ~SendSideCongestionController() override;
-
- void RegisterPacketFeedbackObserver(
- PacketFeedbackObserver* observer) override;
- void DeRegisterPacketFeedbackObserver(
- PacketFeedbackObserver* observer) override;
-
- // Currently, there can be at most one observer.
- // TODO(nisse): The RegisterNetworkObserver method is needed because we first
- // construct this object (as part of RtpTransportControllerSend), then pass a
- // reference to Call, which then registers itself as the observer. We should
- // try to break this circular chain of references, and make the observer a
- // construction time constant.
- void RegisterNetworkObserver(NetworkChangedObserver* observer) override;
-
- void SetBweBitrates(int min_bitrate_bps,
- int start_bitrate_bps,
- int max_bitrate_bps) override;
-
- void SetAllocatedSendBitrateLimits(int64_t min_send_bitrate_bps,
- int64_t max_padding_bitrate_bps,
- int64_t max_total_bitrate_bps) override;
-
- // Resets the BWE state. Note the first argument is the bitrate_bps.
- void OnNetworkRouteChanged(const rtc::NetworkRoute& network_route,
- int bitrate_bps,
- int min_bitrate_bps,
- int max_bitrate_bps) override;
- void SignalNetworkState(NetworkState state) override;
-
- RtcpBandwidthObserver* GetBandwidthObserver() override;
-
- bool AvailableBandwidth(uint32_t* bandwidth) const override;
-
- TransportFeedbackObserver* GetTransportFeedbackObserver() override;
-
- void SetPerPacketFeedbackAvailable(bool available) override;
- void EnablePeriodicAlrProbing(bool enable) override;
-
- void OnSentPacket(const rtc::SentPacket& sent_packet) override;
-
- // Implements RtcpBandwidthObserver
- void OnReceivedEstimatedBitrate(uint32_t bitrate) override;
- void OnReceivedRtcpReceiverReport(const ReportBlockList& report_blocks,
- int64_t rtt,
- int64_t now_ms) override;
- // Implements CallStatsObserver.
- void OnRttUpdate(int64_t avg_rtt_ms, int64_t max_rtt_ms) override;
-
- // Implements Module.
- int64_t TimeUntilNextProcess() override;
- void Process() override;
-
- // Implements TransportFeedbackObserver.
- void AddPacket(uint32_t ssrc,
- uint16_t sequence_number,
- size_t length,
- const PacedPacketInfo& pacing_info) override;
- void OnTransportFeedback(const rtcp::TransportFeedback& feedback) override;
-
- std::vector<PacketFeedback> GetTransportFeedbackVector() const;
-
- void SetPacingFactor(float pacing_factor) override;
-
- void SetAllocatedBitrateWithoutFeedback(uint32_t bitrate_bps) override;
-
- protected:
- // TODO(srte): The tests should be rewritten to not depend on internals and
- // these functions should be removed.
- // Since we can't control the timing of the internal task queue, this method
- // is used in unit tests to stop the periodic tasks from running unless
- // PostPeriodicTasksForTest is called.
- void DisablePeriodicTasks();
- // Post periodic tasks just once. This allows unit tests to trigger process
- // updates immediately.
- void PostPeriodicTasksForTest();
- // Waits for outstanding tasks to be finished. This allos unit tests to ensure
- // that expected callbacks has been called.
- void WaitOnTasksForTest();
-
- private:
- void MaybeCreateControllers() RTC_RUN_ON(task_queue_);
- void MaybeRecreateControllers() RTC_RUN_ON(task_queue_);
- void UpdateInitialConstraints(TargetRateConstraints new_contraints)
- RTC_RUN_ON(task_queue_);
-
- void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_);
- void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_);
- void UpdatePacerQueue() RTC_RUN_ON(task_queue_);
-
- void UpdateStreamsConfig() RTC_RUN_ON(task_queue_);
- void MaybeUpdateOutstandingData();
- void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks,
- int64_t now_ms)
- RTC_RUN_ON(task_queue_);
-
- const Clock* const clock_;
- // PacedSender is thread safe and doesn't need protection here.
- PacedSender* const pacer_;
- // TODO(srte): Move all access to feedback adapter to task queue.
- TransportFeedbackAdapter transport_feedback_adapter_;
-
- NetworkControllerFactoryInterface* const controller_factory_with_feedback_
- RTC_GUARDED_BY(task_queue_);
- const std::unique_ptr<NetworkControllerFactoryInterface>
- controller_factory_fallback_ RTC_GUARDED_BY(task_queue_);
-
- std::unique_ptr<CongestionControlHandler> control_handler_
- RTC_GUARDED_BY(task_queue_);
-
- std::unique_ptr<NetworkControllerInterface> controller_
- RTC_GUARDED_BY(task_queue_);
-
- TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_);
-
- std::map<uint32_t, RTCPReportBlock> last_report_blocks_
- RTC_GUARDED_BY(task_queue_);
- Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_);
-
- NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_);
- NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_);
- StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_);
-
- const bool reset_feedback_on_route_change_;
- const bool send_side_bwe_with_overhead_;
- // Transport overhead is written by OnNetworkRouteChanged and read by
- // AddPacket.
- // TODO(srte): Remove atomic when feedback adapter runs on task queue.
- std::atomic<size_t> transport_overhead_bytes_per_packet_;
- bool network_available_ RTC_GUARDED_BY(task_queue_);
- bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_);
- bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_);
- send_side_cc_internal::PeriodicTask* pacer_queue_update_task_
- RTC_GUARDED_BY(task_queue_);
- send_side_cc_internal::PeriodicTask* controller_task_
- RTC_GUARDED_BY(task_queue_);
-
- // Protects access to last_packet_feedback_vector_ in feedback adapter.
- // TODO(srte): Remove this checker when feedback adapter runs on task queue.
- rtc::RaceChecker worker_race_;
-
- rtc::TaskQueue* task_queue_;
-
- RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(SendSideCongestionController);
-};
-} // namespace webrtc_cc
-} // namespace webrtc
-
-#endif // MODULES_CONGESTION_CONTROLLER_RTP_INCLUDE_SEND_SIDE_CONGESTION_CONTROLLER_H_
diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc
deleted file mode 100644
index 982380f..0000000
--- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc
+++ /dev/null
@@ -1,588 +0,0 @@
-/*
- * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "modules/congestion_controller/rtp/include/send_side_congestion_controller.h"
-
-#include <algorithm>
-#include <functional>
-#include <memory>
-#include <vector>
-#include "absl/memory/memory.h"
-#include "api/transport/goog_cc_factory.h"
-#include "api/transport/network_types.h"
-#include "modules/remote_bitrate_estimator/include/bwe_defines.h"
-#include "rtc_base/bind.h"
-#include "rtc_base/checks.h"
-#include "rtc_base/event.h"
-#include "rtc_base/format_macros.h"
-#include "rtc_base/logging.h"
-#include "rtc_base/network/sent_packet.h"
-#include "rtc_base/numerics/safe_conversions.h"
-#include "rtc_base/numerics/safe_minmax.h"
-#include "rtc_base/rate_limiter.h"
-#include "rtc_base/sequenced_task_checker.h"
-#include "rtc_base/timeutils.h"
-#include "system_wrappers/include/field_trial.h"
-
-using absl::make_unique;
-
-namespace webrtc {
-namespace webrtc_cc {
-namespace {
-using send_side_cc_internal::PeriodicTask;
-const int64_t PacerQueueUpdateIntervalMs = 25;
-
-TargetRateConstraints ConvertConstraints(int min_bitrate_bps,
- int max_bitrate_bps,
- int start_bitrate_bps,
- const Clock* clock) {
- TargetRateConstraints msg;
- msg.at_time = Timestamp::ms(clock->TimeInMilliseconds());
- msg.min_data_rate =
- min_bitrate_bps >= 0 ? DataRate::bps(min_bitrate_bps) : DataRate::Zero();
- msg.max_data_rate = max_bitrate_bps > 0 ? DataRate::bps(max_bitrate_bps)
- : DataRate::Infinity();
- if (start_bitrate_bps > 0)
- msg.starting_rate = DataRate::bps(start_bitrate_bps);
- return msg;
-}
-
-// The template closure pattern is based on rtc::ClosureTask.
-template <class Closure>
-class PeriodicTaskImpl final : public PeriodicTask {
- public:
- PeriodicTaskImpl(rtc::TaskQueue* task_queue,
- int64_t period_ms,
- Closure&& closure)
- : task_queue_(task_queue),
- period_ms_(period_ms),
- closure_(std::forward<Closure>(closure)) {}
- bool Run() override {
- if (!running_)
- return true;
- closure_();
- // absl::WrapUnique lets us repost this task on the TaskQueue.
- task_queue_->PostDelayedTask(absl::WrapUnique(this), period_ms_);
- // Return false to tell TaskQueue to not destruct this object, since we have
- // taken ownership with absl::WrapUnique.
- return false;
- }
- void Stop() override {
- if (task_queue_->IsCurrent()) {
- RTC_DCHECK(running_);
- running_ = false;
- } else {
- task_queue_->PostTask([this] { Stop(); });
- }
- }
-
- private:
- rtc::TaskQueue* const task_queue_;
- const int64_t period_ms_;
- typename std::remove_const<
- typename std::remove_reference<Closure>::type>::type closure_;
- bool running_ = true;
-};
-
-template <class Closure>
-static PeriodicTask* StartPeriodicTask(rtc::TaskQueue* task_queue,
- int64_t period_ms,
- Closure&& closure) {
- auto periodic_task = absl::make_unique<PeriodicTaskImpl<Closure>>(
- task_queue, period_ms, std::forward<Closure>(closure));
- PeriodicTask* periodic_task_ptr = periodic_task.get();
- task_queue->PostDelayedTask(std::move(periodic_task), period_ms);
- return periodic_task_ptr;
-}
-
-} // namespace
-
-SendSideCongestionController::SendSideCongestionController(
- const Clock* clock,
- rtc::TaskQueue* task_queue,
- RtcEventLog* event_log,
- PacedSender* pacer,
- int start_bitrate_bps,
- int min_bitrate_bps,
- int max_bitrate_bps,
- NetworkControllerFactoryInterface* controller_factory)
- : clock_(clock),
- pacer_(pacer),
- transport_feedback_adapter_(clock_),
- controller_factory_with_feedback_(controller_factory),
- controller_factory_fallback_(
- absl::make_unique<GoogCcNetworkControllerFactory>(event_log)),
- process_interval_(controller_factory_fallback_->GetProcessInterval()),
- last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())),
- observer_(nullptr),
- reset_feedback_on_route_change_(
- !field_trial::IsEnabled("WebRTC-Bwe-NoFeedbackReset")),
- send_side_bwe_with_overhead_(
- webrtc::field_trial::IsEnabled("WebRTC-SendSideBwe-WithOverhead")),
- transport_overhead_bytes_per_packet_(0),
- network_available_(false),
- periodic_tasks_enabled_(true),
- packet_feedback_available_(false),
- pacer_queue_update_task_(nullptr),
- controller_task_(nullptr),
- task_queue_(task_queue) {
- initial_config_.constraints = ConvertConstraints(
- min_bitrate_bps, max_bitrate_bps, start_bitrate_bps, clock_);
- RTC_DCHECK(start_bitrate_bps > 0);
- // To be fully compatible with legacy SendSideCongestionController, make sure
- // pacer is initialized even if there are no registered streams. This should
- // not happen under normal circumstances, but some tests rely on it and there
- // are no checks detecting when the legacy SendSideCongestionController is
- // used. This way of setting the value has the drawback that it might be wrong
- // compared to what the actual value from the congestion controller will be.
- // TODO(srte): Remove this when the legacy SendSideCongestionController is
- // removed.
- pacer_->SetEstimatedBitrate(start_bitrate_bps);
-}
-
-// There is no point in having a network controller for a network that is not
-// yet available and if we don't have any observer of it's state.
-// MaybeCreateControllers is used to trigger creation if those things are
-// fulfilled. This is needed since dependent code uses the period until network
-// is signalled to be avaliabile to set the expected start bitrate which is sent
-// to the initializer for NetworkControllers. The observer is injected later due
-// to a circular dependency between RtpTransportControllerSend and Call.
-// TODO(srte): Break the circular dependency issue and make sure that starting
-// bandwidth is set before this class is initialized so the controllers can be
-// created in the constructor.
-void SendSideCongestionController::MaybeCreateControllers() {
- if (!controller_)
- MaybeRecreateControllers();
-}
-
-void SendSideCongestionController::MaybeRecreateControllers() {
- if (!network_available_ || !observer_)
- return;
- if (!control_handler_) {
- control_handler_ =
- absl::make_unique<CongestionControlHandler>(observer_, pacer_);
- }
-
- initial_config_.constraints.at_time =
- Timestamp::ms(clock_->TimeInMilliseconds());
- initial_config_.stream_based_config = streams_config_;
-
- if (!controller_) {
- // TODO(srte): Use fallback controller if no feedback is available.
- if (controller_factory_with_feedback_) {
- RTC_LOG(LS_INFO) << "Creating feedback based only controller";
- controller_ = controller_factory_with_feedback_->Create(initial_config_);
- process_interval_ =
- controller_factory_with_feedback_->GetProcessInterval();
- } else {
- RTC_LOG(LS_INFO) << "Creating fallback controller";
- controller_ = controller_factory_fallback_->Create(initial_config_);
- process_interval_ = controller_factory_fallback_->GetProcessInterval();
- }
- UpdateControllerWithTimeInterval();
- StartProcessPeriodicTasks();
- }
- RTC_DCHECK(controller_);
-}
-
-void SendSideCongestionController::UpdateInitialConstraints(
- TargetRateConstraints new_contraints) {
- if (!new_contraints.starting_rate)
- new_contraints.starting_rate = initial_config_.constraints.starting_rate;
- RTC_DCHECK(new_contraints.starting_rate);
- initial_config_.constraints = new_contraints;
-}
-
-SendSideCongestionController::~SendSideCongestionController() = default;
-
-void SendSideCongestionController::RegisterPacketFeedbackObserver(
- PacketFeedbackObserver* observer) {
- transport_feedback_adapter_.RegisterPacketFeedbackObserver(observer);
-}
-
-void SendSideCongestionController::DeRegisterPacketFeedbackObserver(
- PacketFeedbackObserver* observer) {
- transport_feedback_adapter_.DeRegisterPacketFeedbackObserver(observer);
-}
-
-void SendSideCongestionController::RegisterNetworkObserver(
- NetworkChangedObserver* observer) {
- task_queue_->PostTask([this, observer]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- RTC_DCHECK(observer_ == nullptr);
- observer_ = observer;
- MaybeCreateControllers();
- });
-}
-
-void SendSideCongestionController::SetBweBitrates(int min_bitrate_bps,
- int start_bitrate_bps,
- int max_bitrate_bps) {
- TargetRateConstraints constraints = ConvertConstraints(
- min_bitrate_bps, max_bitrate_bps, start_bitrate_bps, clock_);
- task_queue_->PostTask([this, constraints]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- if (controller_) {
- control_handler_->PostUpdates(
- controller_->OnTargetRateConstraints(constraints));
- } else {
- UpdateInitialConstraints(constraints);
- }
- });
-}
-
-void SendSideCongestionController::SetAllocatedSendBitrateLimits(
- int64_t min_send_bitrate_bps,
- int64_t max_padding_bitrate_bps,
- int64_t max_total_bitrate_bps) {
- RTC_DCHECK_RUN_ON(task_queue_);
- streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps);
- streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps);
- streams_config_.max_total_allocated_bitrate =
- DataRate::bps(max_total_bitrate_bps);
- UpdateStreamsConfig();
-}
-
-// TODO(holmer): Split this up and use SetBweBitrates in combination with
-// OnNetworkRouteChanged.
-void SendSideCongestionController::OnNetworkRouteChanged(
- const rtc::NetworkRoute& network_route,
- int start_bitrate_bps,
- int min_bitrate_bps,
- int max_bitrate_bps) {
- if (reset_feedback_on_route_change_)
- transport_feedback_adapter_.SetNetworkIds(network_route.local_network_id,
- network_route.remote_network_id);
- transport_overhead_bytes_per_packet_ = network_route.packet_overhead;
-
- NetworkRouteChange msg;
- msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
- msg.constraints = ConvertConstraints(min_bitrate_bps, max_bitrate_bps,
- start_bitrate_bps, clock_);
-
- task_queue_->PostTask([this, msg]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- if (controller_) {
- control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg));
- } else {
- UpdateInitialConstraints(msg.constraints);
- }
- pacer_->UpdateOutstandingData(0);
- });
-}
-
-bool SendSideCongestionController::AvailableBandwidth(
- uint32_t* bandwidth) const {
- // This is only called in the OnNetworkChanged callback in
- // RtpTransportControllerSend which is called from ControlHandler, which is
- // running on the task queue.
- // TODO(srte): Remove this function when RtpTransportControllerSend stops
- // calling it.
- RTC_DCHECK_RUN_ON(task_queue_);
- if (!control_handler_) {
- return false;
- }
- // TODO(srte): Remove this interface and push information about bandwidth
- // estimation to users of this class, thereby reducing synchronous calls.
- if (control_handler_->last_transfer_rate().has_value()) {
- *bandwidth = control_handler_->last_transfer_rate()
- ->network_estimate.bandwidth.bps();
- return true;
- }
- return false;
-}
-
-RtcpBandwidthObserver* SendSideCongestionController::GetBandwidthObserver() {
- return this;
-}
-
-void SendSideCongestionController::SetPerPacketFeedbackAvailable(
- bool available) {
- RTC_DCHECK_RUN_ON(task_queue_);
- packet_feedback_available_ = available;
- MaybeRecreateControllers();
-}
-
-void SendSideCongestionController::EnablePeriodicAlrProbing(bool enable) {
- task_queue_->PostTask([this, enable]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- streams_config_.requests_alr_probing = enable;
- UpdateStreamsConfig();
- });
-}
-
-void SendSideCongestionController::UpdateStreamsConfig() {
- streams_config_.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
- if (controller_)
- control_handler_->PostUpdates(
- controller_->OnStreamsConfig(streams_config_));
-}
-
-TransportFeedbackObserver*
-SendSideCongestionController::GetTransportFeedbackObserver() {
- return this;
-}
-
-void SendSideCongestionController::SignalNetworkState(NetworkState state) {
- RTC_LOG(LS_INFO) << "SignalNetworkState "
- << (state == kNetworkUp ? "Up" : "Down");
- NetworkAvailability msg;
- msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
- msg.network_available = state == kNetworkUp;
- task_queue_->PostTask([this, msg]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- network_available_ = msg.network_available;
- if (controller_) {
- control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg));
- control_handler_->OnNetworkAvailability(msg);
- } else {
- MaybeCreateControllers();
- }
- });
-}
-
-void SendSideCongestionController::OnSentPacket(
- const rtc::SentPacket& sent_packet) {
- absl::optional<SentPacket> packet_msg =
- transport_feedback_adapter_.ProcessSentPacket(sent_packet);
- if (packet_msg) {
- task_queue_->PostTask([this, packet_msg]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- if (controller_)
- control_handler_->PostUpdates(controller_->OnSentPacket(*packet_msg));
- });
- }
- MaybeUpdateOutstandingData();
-}
-
-void SendSideCongestionController::OnRttUpdate(int64_t avg_rtt_ms,
- int64_t max_rtt_ms) {
- int64_t now_ms = clock_->TimeInMilliseconds();
- RoundTripTimeUpdate report;
- report.receive_time = Timestamp::ms(now_ms);
- report.round_trip_time = TimeDelta::ms(avg_rtt_ms);
- report.smoothed = true;
- task_queue_->PostTask([this, report]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- if (controller_)
- control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
- });
-}
-
-int64_t SendSideCongestionController::TimeUntilNextProcess() {
- // Using task queue to process, just sleep long to avoid wasting resources.
- return 60 * 1000;
-}
-
-void SendSideCongestionController::Process() {
- // Ignored, using task queue to process.
-}
-
-void SendSideCongestionController::StartProcessPeriodicTasks() {
- if (!periodic_tasks_enabled_)
- return;
- if (!pacer_queue_update_task_) {
- pacer_queue_update_task_ =
- StartPeriodicTask(task_queue_, PacerQueueUpdateIntervalMs, [this]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- UpdatePacerQueue();
- });
- }
- if (controller_task_) {
- // Stop is not synchronous, but is guaranteed to occur before the first
- // invocation of the new controller task started below.
- controller_task_->Stop();
- controller_task_ = nullptr;
- }
- if (process_interval_.IsFinite()) {
- // The controller task is owned by the task queue and lives until the task
- // queue is destroyed or some time after Stop() is called, whichever comes
- // first.
- controller_task_ =
- StartPeriodicTask(task_queue_, process_interval_.ms(), [this]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- UpdateControllerWithTimeInterval();
- });
- }
-}
-
-void SendSideCongestionController::UpdateControllerWithTimeInterval() {
- if (controller_) {
- ProcessInterval msg;
- msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
- control_handler_->PostUpdates(controller_->OnProcessInterval(msg));
- }
-}
-
-void SendSideCongestionController::UpdatePacerQueue() {
- if (control_handler_) {
- TimeDelta expected_queue_time =
- TimeDelta::ms(pacer_->ExpectedQueueTimeMs());
- control_handler_->OnPacerQueueUpdate(expected_queue_time);
- }
-}
-
-void SendSideCongestionController::AddPacket(
- uint32_t ssrc,
- uint16_t sequence_number,
- size_t length,
- const PacedPacketInfo& pacing_info) {
- if (send_side_bwe_with_overhead_) {
- length += transport_overhead_bytes_per_packet_;
- }
- transport_feedback_adapter_.AddPacket(ssrc, sequence_number, length,
- pacing_info);
-}
-
-void SendSideCongestionController::OnTransportFeedback(
- const rtcp::TransportFeedback& feedback) {
- RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
-
- absl::optional<TransportPacketsFeedback> feedback_msg =
- transport_feedback_adapter_.ProcessTransportFeedback(feedback);
- if (feedback_msg) {
- task_queue_->PostTask([this, feedback_msg]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- if (controller_)
- control_handler_->PostUpdates(
- controller_->OnTransportPacketsFeedback(*feedback_msg));
- });
- }
- MaybeUpdateOutstandingData();
-}
-
-void SendSideCongestionController::MaybeUpdateOutstandingData() {
- DataSize in_flight_data = transport_feedback_adapter_.GetOutstandingData();
- task_queue_->PostTask([this, in_flight_data]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- if (control_handler_)
- control_handler_->OnOutstandingData(in_flight_data);
- });
-}
-
-std::vector<PacketFeedback>
-SendSideCongestionController::GetTransportFeedbackVector() const {
- RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
- return transport_feedback_adapter_.GetTransportFeedbackVector();
-}
-
-void SendSideCongestionController::PostPeriodicTasksForTest() {
- task_queue_->PostTask([this]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- UpdateControllerWithTimeInterval();
- UpdatePacerQueue();
- });
-}
-
-void SendSideCongestionController::WaitOnTasksForTest() {
- rtc::Event event;
- task_queue_->PostTask([&event]() { event.Set(); });
- event.Wait(rtc::Event::kForever);
-}
-
-void SendSideCongestionController::SetPacingFactor(float pacing_factor) {
- RTC_DCHECK_RUN_ON(task_queue_);
- streams_config_.pacing_factor = pacing_factor;
- UpdateStreamsConfig();
-}
-
-void SendSideCongestionController::SetAllocatedBitrateWithoutFeedback(
- uint32_t bitrate_bps) {
- task_queue_->PostTask([this, bitrate_bps]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- streams_config_.unacknowledged_rate_allocation = DataRate::bps(bitrate_bps);
- UpdateStreamsConfig();
- });
-}
-
-void SendSideCongestionController::DisablePeriodicTasks() {
- task_queue_->PostTask([this]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- periodic_tasks_enabled_ = false;
- });
-}
-
-void SendSideCongestionController::OnReceivedEstimatedBitrate(
- uint32_t bitrate) {
- RemoteBitrateReport msg;
- msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds());
- msg.bandwidth = DataRate::bps(bitrate);
- task_queue_->PostTask([this, msg]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- if (controller_)
- control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg));
- });
-}
-
-void SendSideCongestionController::OnReceivedRtcpReceiverReport(
- const webrtc::ReportBlockList& report_blocks,
- int64_t rtt_ms,
- int64_t now_ms) {
- task_queue_->PostTask([this, report_blocks, now_ms]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
- });
-
- task_queue_->PostTask([this, now_ms, rtt_ms]() {
- RTC_DCHECK_RUN_ON(task_queue_);
- RoundTripTimeUpdate report;
- report.receive_time = Timestamp::ms(now_ms);
- report.round_trip_time = TimeDelta::ms(rtt_ms);
- report.smoothed = false;
- if (controller_)
- control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
- });
-}
-
-void SendSideCongestionController::OnReceivedRtcpReceiverReportBlocks(
- const ReportBlockList& report_blocks,
- int64_t now_ms) {
- if (report_blocks.empty())
- return;
-
- int total_packets_lost_delta = 0;
- int total_packets_delta = 0;
-
- // Compute the packet loss from all report blocks.
- for (const RTCPReportBlock& report_block : report_blocks) {
- auto it = last_report_blocks_.find(report_block.source_ssrc);
- if (it != last_report_blocks_.end()) {
- auto number_of_packets = report_block.extended_highest_sequence_number -
- it->second.extended_highest_sequence_number;
- total_packets_delta += number_of_packets;
- auto lost_delta = report_block.packets_lost - it->second.packets_lost;
- total_packets_lost_delta += lost_delta;
- }
- last_report_blocks_[report_block.source_ssrc] = report_block;
- }
- // Can only compute delta if there has been previous blocks to compare to. If
- // not, total_packets_delta will be unchanged and there's nothing more to do.
- if (!total_packets_delta)
- return;
- int packets_received_delta = total_packets_delta - total_packets_lost_delta;
- // To detect lost packets, at least one packet has to be received. This check
- // is needed to avoid bandwith detection update in
- // VideoSendStreamTest.SuspendBelowMinBitrate
-
- if (packets_received_delta < 1)
- return;
- Timestamp now = Timestamp::ms(now_ms);
- TransportLossReport msg;
- msg.packets_lost_delta = total_packets_lost_delta;
- msg.packets_received_delta = packets_received_delta;
- msg.receive_time = now;
- msg.start_time = last_report_block_time_;
- msg.end_time = now;
- if (controller_)
- control_handler_->PostUpdates(controller_->OnTransportLossReport(msg));
- last_report_block_time_ = now;
-}
-} // namespace webrtc_cc
-} // namespace webrtc
diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc
deleted file mode 100644
index 64a5234..0000000
--- a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc
+++ /dev/null
@@ -1,523 +0,0 @@
-/*
- * Copyright (c) 2016 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "modules/congestion_controller/rtp/include/send_side_congestion_controller.h"
-#include "logging/rtc_event_log/mock/mock_rtc_event_log.h"
-#include "modules/congestion_controller/include/mock/mock_congestion_observer.h"
-#include "modules/congestion_controller/rtp/congestion_controller_unittests_helper.h"
-#include "modules/pacing/mock/mock_paced_sender.h"
-#include "modules/pacing/packet_router.h"
-#include "modules/remote_bitrate_estimator/include/bwe_defines.h"
-#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
-#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
-#include "rtc_base/network/sent_packet.h"
-#include "system_wrappers/include/clock.h"
-#include "test/field_trial.h"
-#include "test/gmock.h"
-#include "test/gtest.h"
-
-using testing::_;
-using testing::AtLeast;
-using testing::Ge;
-using testing::NiceMock;
-using testing::Return;
-using testing::SaveArg;
-using testing::StrictMock;
-
-namespace webrtc {
-namespace webrtc_cc {
-namespace test {
-
-namespace {
-using webrtc::test::MockCongestionObserver;
-const webrtc::PacedPacketInfo kPacingInfo0(0, 5, 2000);
-const webrtc::PacedPacketInfo kPacingInfo1(1, 8, 4000);
-
-const uint32_t kInitialBitrateBps = 60000;
-const float kDefaultPacingRate = 2.5f;
-
-class SendSideCongestionControllerForTest
- : public SendSideCongestionController {
- public:
- using SendSideCongestionController::SendSideCongestionController;
- ~SendSideCongestionControllerForTest() {}
- using SendSideCongestionController::DisablePeriodicTasks;
- void WaitOnTasks() { SendSideCongestionController::WaitOnTasksForTest(); }
- void Process() override {
- SendSideCongestionController::PostPeriodicTasksForTest();
- SendSideCongestionController::WaitOnTasksForTest();
- }
-};
-} // namespace
-
-class SendSideCongestionControllerTest : public ::testing::Test {
- protected:
- SendSideCongestionControllerTest()
- : clock_(123456),
- target_bitrate_observer_(this),
- bandwidth_observer_(nullptr) {}
- ~SendSideCongestionControllerTest() override {}
-
- void SetUp() override {
- pacer_.reset(new NiceMock<MockPacedSender>());
- // Set the initial bitrate estimate and expect the |observer| and |pacer_|
- // to be updated.
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
- EXPECT_CALL(*pacer_,
- SetPacingRates(kInitialBitrateBps * kDefaultPacingRate, _));
- EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 3));
- EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 5));
- task_queue_ = absl::make_unique<rtc::TaskQueue>("SSCC Test");
- controller_.reset(new SendSideCongestionControllerForTest(
- &clock_, task_queue_.get(), &event_log_, pacer_.get(),
- kInitialBitrateBps, 0, 5 * kInitialBitrateBps, nullptr));
- controller_->DisablePeriodicTasks();
- controller_->RegisterNetworkObserver(&observer_);
- controller_->SignalNetworkState(NetworkState::kNetworkUp);
- bandwidth_observer_ = controller_->GetBandwidthObserver();
- controller_->WaitOnTasks();
- testing::Mock::VerifyAndClearExpectations(pacer_.get());
- testing::Mock::VerifyAndClearExpectations(&observer_);
- }
-
- void TearDown() override { controller_->WaitOnTasks(); }
-
- // Custom setup - use an observer that tracks the target bitrate, without
- // prescribing on which iterations it must change (like a mock would).
- void TargetBitrateTrackingSetup() {
- bandwidth_observer_ = nullptr;
- pacer_.reset(new NiceMock<MockPacedSender>());
- task_queue_ = absl::make_unique<rtc::TaskQueue>("SSCC Test");
- controller_.reset(new SendSideCongestionControllerForTest(
- &clock_, task_queue_.get(), &event_log_, pacer_.get(),
- kInitialBitrateBps, 0, 5 * kInitialBitrateBps, nullptr));
- controller_->DisablePeriodicTasks();
- controller_->RegisterNetworkObserver(&target_bitrate_observer_);
- controller_->SignalNetworkState(NetworkState::kNetworkUp);
- }
-
- void OnSentPacket(const PacketFeedback& packet_feedback) {
- constexpr uint32_t ssrc = 0;
- controller_->AddPacket(ssrc, packet_feedback.sequence_number,
- packet_feedback.payload_size,
- packet_feedback.pacing_info);
- rtc::PacketInfo packet_info;
- packet_info.included_in_feedback = true;
- controller_->OnSentPacket(rtc::SentPacket(packet_feedback.sequence_number,
- packet_feedback.send_time_ms,
- packet_info));
- }
-
- // Allows us to track the target bitrate, without prescribing the exact
- // iterations when this would hapen, like a mock would.
- class TargetBitrateObserver : public NetworkChangedObserver {
- public:
- explicit TargetBitrateObserver(SendSideCongestionControllerTest* owner)
- : owner_(owner) {}
- ~TargetBitrateObserver() override = default;
- void OnNetworkChanged(uint32_t bitrate_bps,
- uint8_t fraction_loss, // 0 - 255.
- int64_t rtt_ms,
- int64_t probing_interval_ms) override {
- owner_->target_bitrate_bps_ = bitrate_bps;
- }
-
- private:
- SendSideCongestionControllerTest* owner_;
- };
-
- void PacketTransmissionAndFeedbackBlock(uint16_t* seq_num,
- int64_t runtime_ms,
- int64_t delay) {
- int64_t delay_buildup = 0;
- int64_t start_time_ms = clock_.TimeInMilliseconds();
- while (clock_.TimeInMilliseconds() - start_time_ms < runtime_ms) {
- constexpr size_t kPayloadSize = 1000;
- PacketFeedback packet(clock_.TimeInMilliseconds() + delay_buildup,
- clock_.TimeInMilliseconds(), *seq_num, kPayloadSize,
- PacedPacketInfo());
- delay_buildup += delay; // Delay has to increase, or it's just RTT.
- OnSentPacket(packet);
- // Create expected feedback and send into adapter.
- std::unique_ptr<rtcp::TransportFeedback> feedback(
- new rtcp::TransportFeedback());
- feedback->SetBase(packet.sequence_number, packet.arrival_time_ms * 1000);
- EXPECT_TRUE(feedback->AddReceivedPacket(packet.sequence_number,
- packet.arrival_time_ms * 1000));
- rtc::Buffer raw_packet = feedback->Build();
- feedback = rtcp::TransportFeedback::ParseFrom(raw_packet.data(),
- raw_packet.size());
- EXPECT_TRUE(feedback.get() != nullptr);
- controller_->OnTransportFeedback(*feedback.get());
- clock_.AdvanceTimeMilliseconds(50);
- controller_->Process();
- ++(*seq_num);
- }
- }
-
- SimulatedClock clock_;
- StrictMock<MockCongestionObserver> observer_;
- TargetBitrateObserver target_bitrate_observer_;
- NiceMock<MockRtcEventLog> event_log_;
- RtcpBandwidthObserver* bandwidth_observer_;
- PacketRouter packet_router_;
- std::unique_ptr<NiceMock<MockPacedSender>> pacer_;
- std::unique_ptr<SendSideCongestionControllerForTest> controller_;
- absl::optional<uint32_t> target_bitrate_bps_;
- std::unique_ptr<rtc::TaskQueue> task_queue_;
-};
-
-TEST_F(SendSideCongestionControllerTest, OnNetworkChanged) {
- // Test no change.
- clock_.AdvanceTimeMilliseconds(25);
- controller_->Process();
-
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _));
- EXPECT_CALL(*pacer_,
- SetPacingRates(kInitialBitrateBps * 2 * kDefaultPacingRate, _));
- bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2);
- clock_.AdvanceTimeMilliseconds(25);
- controller_->Process();
-
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
- EXPECT_CALL(*pacer_,
- SetPacingRates(kInitialBitrateBps * kDefaultPacingRate, _));
- bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps);
- clock_.AdvanceTimeMilliseconds(25);
- controller_->Process();
-}
-
-TEST_F(SendSideCongestionControllerTest, OnSendQueueFull) {
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
- .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1));
-
- EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
- controller_->Process();
-
- // Let the pacer not be full next time the controller checks.
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
- .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs - 1));
-
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
- controller_->Process();
-}
-
-TEST_F(SendSideCongestionControllerTest, OnSendQueueFullAndEstimateChange) {
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
- .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1));
- EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
- controller_->Process();
-
- // Receive new estimate but let the queue still be full.
- bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2);
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
- .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1));
- // The send pacer should get the new estimate though.
- EXPECT_CALL(*pacer_,
- SetPacingRates(kInitialBitrateBps * 2 * kDefaultPacingRate, _));
- clock_.AdvanceTimeMilliseconds(25);
- controller_->Process();
-
- // Let the pacer not be full next time the controller checks.
- // |OnNetworkChanged| should be called with the new estimate.
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
- .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs - 1));
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _));
- clock_.AdvanceTimeMilliseconds(25);
- controller_->Process();
-}
-
-TEST_F(SendSideCongestionControllerTest, SignalNetworkState) {
- EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
- controller_->SignalNetworkState(kNetworkDown);
- controller_->WaitOnTasks();
-
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
- controller_->SignalNetworkState(kNetworkUp);
- controller_->WaitOnTasks();
-
- EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
- controller_->SignalNetworkState(kNetworkDown);
-}
-
-TEST_F(SendSideCongestionControllerTest, OnNetworkRouteChanged) {
- int new_bitrate = 200000;
- EXPECT_CALL(observer_, OnNetworkChanged(new_bitrate, _, _, _));
- EXPECT_CALL(*pacer_, SetPacingRates(new_bitrate * kDefaultPacingRate, _));
- rtc::NetworkRoute route;
- route.local_network_id = 1;
- controller_->OnNetworkRouteChanged(route, new_bitrate, -1, -1);
- controller_->WaitOnTasks();
-
- testing::Mock::VerifyAndClearExpectations(pacer_.get());
- testing::Mock::VerifyAndClearExpectations(&observer_);
- // If the bitrate is reset to -1, the new starting bitrate will be
- // the minimum default bitrate kMinBitrateBps.
- EXPECT_CALL(
- observer_,
- OnNetworkChanged(congestion_controller::GetMinBitrateBps(), _, _, _));
- EXPECT_CALL(
- *pacer_,
- SetPacingRates(
- congestion_controller::GetMinBitrateBps() * kDefaultPacingRate, _));
- route.local_network_id = 2;
- controller_->OnNetworkRouteChanged(route, -1, -1, -1);
-}
-
-TEST_F(SendSideCongestionControllerTest, OldFeedback) {
- int new_bitrate = 200000;
- testing::Mock::VerifyAndClearExpectations(pacer_.get());
- EXPECT_CALL(observer_, OnNetworkChanged(new_bitrate, _, _, _));
- EXPECT_CALL(*pacer_, SetPacingRates(new_bitrate * kDefaultPacingRate, _));
-
- // Send a few packets on the first network route.
- std::vector<PacketFeedback> packets;
- packets.push_back(PacketFeedback(0, 0, 0, 1500, kPacingInfo0));
- packets.push_back(PacketFeedback(10, 10, 1, 1500, kPacingInfo0));
- packets.push_back(PacketFeedback(20, 20, 2, 1500, kPacingInfo0));
- packets.push_back(PacketFeedback(30, 30, 3, 1500, kPacingInfo1));
- packets.push_back(PacketFeedback(40, 40, 4, 1500, kPacingInfo1));
-
- for (const PacketFeedback& packet : packets)
- OnSentPacket(packet);
-
- // Change route and then insert a number of feedback packets.
- rtc::NetworkRoute route;
- route.local_network_id = 1;
- controller_->OnNetworkRouteChanged(route, new_bitrate, -1, -1);
-
- for (const PacketFeedback& packet : packets) {
- rtcp::TransportFeedback feedback;
- feedback.SetBase(packet.sequence_number, packet.arrival_time_ms * 1000);
-
- EXPECT_TRUE(feedback.AddReceivedPacket(packet.sequence_number,
- packet.arrival_time_ms * 1000));
- feedback.Build();
- controller_->OnTransportFeedback(feedback);
- }
- controller_->WaitOnTasks();
- // If the bitrate is reset to -1, the new starting bitrate will be
- // the minimum default bitrate kMinBitrateBps.
- EXPECT_CALL(
- observer_,
- OnNetworkChanged(congestion_controller::GetMinBitrateBps(), _, _, _));
- EXPECT_CALL(
- *pacer_,
- SetPacingRates(
- congestion_controller::GetMinBitrateBps() * kDefaultPacingRate, _));
- route.local_network_id = 2;
- controller_->OnNetworkRouteChanged(route, -1, -1, -1);
-}
-
-TEST_F(SendSideCongestionControllerTest,
- SignalNetworkStateAndQueueIsFullAndEstimateChange) {
- // Send queue is full.
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
- .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs + 1));
- EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
- controller_->Process();
-
- // Queue is full and network is down. Expect no bitrate change.
- controller_->SignalNetworkState(kNetworkDown);
- controller_->Process();
-
- // Queue is full but network is up. Expect no bitrate change.
- controller_->SignalNetworkState(kNetworkUp);
- controller_->Process();
-
- // Receive new estimate but let the queue still be full.
- EXPECT_CALL(*pacer_,
- SetPacingRates(kInitialBitrateBps * 2 * kDefaultPacingRate, _));
- bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2);
- clock_.AdvanceTimeMilliseconds(25);
- controller_->Process();
-
- // Let the pacer not be full next time the controller checks.
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
- .WillRepeatedly(Return(PacedSender::kMaxQueueLengthMs - 1));
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _));
- controller_->Process();
-}
-
-TEST_F(SendSideCongestionControllerTest, GetProbingInterval) {
- clock_.AdvanceTimeMilliseconds(25);
- controller_->Process();
-
- EXPECT_CALL(observer_, OnNetworkChanged(_, _, _, testing::Ne(0)));
- EXPECT_CALL(*pacer_, SetPacingRates(_, _));
- bandwidth_observer_->OnReceivedEstimatedBitrate(kInitialBitrateBps * 2);
- clock_.AdvanceTimeMilliseconds(25);
- controller_->Process();
-}
-
-TEST_F(SendSideCongestionControllerTest, ProbeOnRouteChange) {
- EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 6));
- EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 12));
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 2, _, _, _));
- rtc::NetworkRoute route;
- route.local_network_id = 1;
- controller_->OnNetworkRouteChanged(route, 2 * kInitialBitrateBps, 0,
- 20 * kInitialBitrateBps);
- controller_->Process();
-}
-
-// Estimated bitrate reduced when the feedbacks arrive with such a long delay,
-// that the send-time-history no longer holds the feedbacked packets.
-TEST_F(SendSideCongestionControllerTest, LongFeedbackDelays) {
- TargetBitrateTrackingSetup();
-
- const int64_t kFeedbackTimeoutMs = 60001;
- const int kMaxConsecutiveFailedLookups = 5;
- for (int i = 0; i < kMaxConsecutiveFailedLookups; ++i) {
- std::vector<PacketFeedback> packets;
- packets.push_back(
- PacketFeedback(i * 100, 2 * i * 100, 0, 1500, kPacingInfo0));
- packets.push_back(
- PacketFeedback(i * 100 + 10, 2 * i * 100 + 10, 1, 1500, kPacingInfo0));
- packets.push_back(
- PacketFeedback(i * 100 + 20, 2 * i * 100 + 20, 2, 1500, kPacingInfo0));
- packets.push_back(
- PacketFeedback(i * 100 + 30, 2 * i * 100 + 30, 3, 1500, kPacingInfo1));
- packets.push_back(
- PacketFeedback(i * 100 + 40, 2 * i * 100 + 40, 4, 1500, kPacingInfo1));
-
- for (const PacketFeedback& packet : packets)
- OnSentPacket(packet);
-
- rtcp::TransportFeedback feedback;
- feedback.SetBase(packets[0].sequence_number,
- packets[0].arrival_time_ms * 1000);
-
- for (const PacketFeedback& packet : packets) {
- EXPECT_TRUE(feedback.AddReceivedPacket(packet.sequence_number,
- packet.arrival_time_ms * 1000));
- }
-
- feedback.Build();
-
- clock_.AdvanceTimeMilliseconds(kFeedbackTimeoutMs);
- PacketFeedback later_packet(kFeedbackTimeoutMs + i * 100 + 40,
- kFeedbackTimeoutMs + i * 200 + 40, 5, 1500,
- kPacingInfo1);
- OnSentPacket(later_packet);
-
- controller_->OnTransportFeedback(feedback);
-
- // Check that packets have timed out.
- for (PacketFeedback& packet : packets) {
- packet.send_time_ms = PacketFeedback::kNoSendTime;
- packet.payload_size = 0;
- packet.pacing_info = PacedPacketInfo();
- }
- ComparePacketFeedbackVectors(packets,
- controller_->GetTransportFeedbackVector());
- }
-
- controller_->Process();
-
- EXPECT_EQ(kInitialBitrateBps / 2, target_bitrate_bps_);
-
- // Test with feedback that isn't late enough to time out.
- {
- std::vector<PacketFeedback> packets;
- packets.push_back(PacketFeedback(100, 200, 0, 1500, kPacingInfo0));
- packets.push_back(PacketFeedback(110, 210, 1, 1500, kPacingInfo0));
- packets.push_back(PacketFeedback(120, 220, 2, 1500, kPacingInfo0));
- packets.push_back(PacketFeedback(130, 230, 3, 1500, kPacingInfo1));
- packets.push_back(PacketFeedback(140, 240, 4, 1500, kPacingInfo1));
-
- for (const PacketFeedback& packet : packets)
- OnSentPacket(packet);
-
- rtcp::TransportFeedback feedback;
- feedback.SetBase(packets[0].sequence_number,
- packets[0].arrival_time_ms * 1000);
-
- for (const PacketFeedback& packet : packets) {
- EXPECT_TRUE(feedback.AddReceivedPacket(packet.sequence_number,
- packet.arrival_time_ms * 1000));
- }
-
- feedback.Build();
-
- clock_.AdvanceTimeMilliseconds(kFeedbackTimeoutMs - 1);
- PacketFeedback later_packet(kFeedbackTimeoutMs + 140,
- kFeedbackTimeoutMs + 240, 5, 1500,
- kPacingInfo1);
- OnSentPacket(later_packet);
-
- controller_->OnTransportFeedback(feedback);
- ComparePacketFeedbackVectors(packets,
- controller_->GetTransportFeedbackVector());
- }
-}
-
-// Bandwidth estimation is updated when feedbacks are received.
-// Feedbacks which show an increasing delay cause the estimation to be reduced.
-TEST_F(SendSideCongestionControllerTest, UpdatesDelayBasedEstimate) {
- TargetBitrateTrackingSetup();
-
- const int64_t kRunTimeMs = 6000;
- uint16_t seq_num = 0;
-
- // The test must run and insert packets/feedback long enough that the
- // BWE computes a valid estimate. This is first done in an environment which
- // simulates no bandwidth limitation, and therefore not built-up delay.
- PacketTransmissionAndFeedbackBlock(&seq_num, kRunTimeMs, 0);
- ASSERT_TRUE(target_bitrate_bps_);
-
- // Repeat, but this time with a building delay, and make sure that the
- // estimation is adjusted downwards.
- uint32_t bitrate_before_delay = *target_bitrate_bps_;
- PacketTransmissionAndFeedbackBlock(&seq_num, kRunTimeMs, 50);
- EXPECT_LT(*target_bitrate_bps_, bitrate_before_delay);
-}
-
-TEST_F(SendSideCongestionControllerTest, PacerQueueEncodeRatePushback) {
- ::webrtc::test::ScopedFieldTrials pushback_field_trial(
- "WebRTC-PacerPushbackExperiment/Enabled/");
- SetUp();
-
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(0));
- controller_->Process();
-
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(100));
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps * 0.9, _, _, _));
- controller_->Process();
-
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(50));
- controller_->Process();
-
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(0));
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
- controller_->Process();
-
- const uint32_t kMinAdjustedBps = 50000;
- int expected_queue_threshold =
- 1000 - kMinAdjustedBps * 1000.0 / kInitialBitrateBps;
-
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
- .WillOnce(Return(expected_queue_threshold));
- EXPECT_CALL(observer_, OnNetworkChanged(Ge(kMinAdjustedBps), _, _, _));
- controller_->Process();
-
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs())
- .WillOnce(Return(expected_queue_threshold + 1));
- EXPECT_CALL(observer_, OnNetworkChanged(0, _, _, _));
- controller_->Process();
-
- EXPECT_CALL(*pacer_, ExpectedQueueTimeMs()).WillOnce(Return(0));
- EXPECT_CALL(observer_, OnNetworkChanged(kInitialBitrateBps, _, _, _));
- controller_->Process();
-}
-
-} // namespace test
-} // namespace webrtc_cc
-} // namespace webrtc