Implements a task-queue based PacedSender, wires it up for field trials
Bug: webrtc:10809
Change-Id: Ia181c16559f4598f32dd399c24802d0a289e250b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/150942
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29946}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index c7ccc92..a44b534 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -72,9 +72,24 @@
const WebRtcKeyValueConfig* trials)
: clock_(clock),
event_log_(event_log),
+ field_trials_(trials ? trials : &fallback_field_trials_),
bitrate_configurator_(bitrate_config),
process_thread_(std::move(process_thread)),
- pacer_(clock, &packet_router_, event_log, trials, process_thread_.get()),
+ use_task_queue_pacer_(IsEnabled(field_trials_, "WebRTC-TaskQueuePacer")),
+ process_thread_pacer_(use_task_queue_pacer_
+ ? nullptr
+ : new PacedSender(clock,
+ &packet_router_,
+ event_log,
+ field_trials_,
+ process_thread_.get())),
+ task_queue_pacer_(use_task_queue_pacer_
+ ? new TaskQueuePacedSender(clock,
+ &packet_router_,
+ event_log,
+ field_trials_,
+ task_queue_factory)
+ : nullptr),
observer_(nullptr),
controller_factory_override_(controller_factory),
controller_factory_fallback_(
@@ -82,11 +97,12 @@
process_interval_(controller_factory_fallback_->GetProcessInterval()),
last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())),
reset_feedback_on_route_change_(
- !IsEnabled(trials, "WebRTC-Bwe-NoFeedbackReset")),
+ !IsEnabled(field_trials_, "WebRTC-Bwe-NoFeedbackReset")),
send_side_bwe_with_overhead_(
- IsEnabled(trials, "WebRTC-SendSideBwe-WithOverhead")),
+ IsEnabled(field_trials_, "WebRTC-SendSideBwe-WithOverhead")),
add_pacing_to_cwin_(
- IsEnabled(trials, "WebRTC-AddPacingToCongestionWindowPushback")),
+ IsEnabled(field_trials_,
+ "WebRTC-AddPacingToCongestionWindowPushback")),
transport_overhead_bytes_per_packet_(0),
network_available_(false),
retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
@@ -95,17 +111,21 @@
TaskQueueFactory::Priority::NORMAL)) {
initial_config_.constraints = ConvertConstraints(bitrate_config, clock_);
initial_config_.event_log = event_log;
- initial_config_.key_value_config = &trial_based_config_;
+ initial_config_.key_value_config = field_trials_;
RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);
pacer()->SetPacingRates(DataRate::bps(bitrate_config.start_bitrate_bps),
DataRate::Zero());
- process_thread_->Start();
+ if (!use_task_queue_pacer_) {
+ process_thread_->Start();
+ }
}
RtpTransportControllerSend::~RtpTransportControllerSend() {
- process_thread_->Stop();
+ if (!use_task_queue_pacer_) {
+ process_thread_->Stop();
+ }
}
RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
@@ -153,15 +173,17 @@
}
RtpPacketPacer* RtpTransportControllerSend::pacer() {
- // TODO(bugs.webrtc.org/10809): Return reference to the correct
- // pacer implementation.
- return &pacer_;
+ if (use_task_queue_pacer_) {
+ return task_queue_pacer_.get();
+ }
+ return process_thread_pacer_.get();
}
const RtpPacketPacer* RtpTransportControllerSend::pacer() const {
- // TODO(bugs.webrtc.org/10809): Return reference to the correct
- // pacer implementation.
- return &pacer_;
+ if (use_task_queue_pacer_) {
+ return task_queue_pacer_.get();
+ }
+ return process_thread_pacer_.get();
}
rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
@@ -183,9 +205,10 @@
}
RtpPacketSender* RtpTransportControllerSend::packet_sender() {
- // TODO(bugs.webrtc.org/10809): Return reference to the correct
- // pacer implementation.
- return &pacer_;
+ if (use_task_queue_pacer_) {
+ return task_queue_pacer_.get();
+ }
+ return process_thread_pacer_.get();
}
void RtpTransportControllerSend::SetAllocatedSendBitrateLimits(
diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h
index 2cadaa3..32c762b 100644
--- a/call/rtp_transport_controller_send.h
+++ b/call/rtp_transport_controller_send.h
@@ -24,8 +24,10 @@
#include "call/rtp_video_sender.h"
#include "modules/congestion_controller/rtp/control_handler.h"
#include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
+#include "modules/pacing/paced_sender.h"
#include "modules/pacing/packet_router.h"
#include "modules/pacing/rtp_packet_pacer.h"
+#include "modules/pacing/task_queue_paced_sender.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/network_route.h"
@@ -137,13 +139,17 @@
Clock* const clock_;
RtcEventLog* const event_log_;
- const FieldTrialBasedConfig trial_based_config_;
+ // TODO(sprang): Remove fallback field-trials.
+ const FieldTrialBasedConfig fallback_field_trials_;
+ const WebRtcKeyValueConfig* field_trials_;
PacketRouter packet_router_;
std::vector<std::unique_ptr<RtpVideoSenderInterface>> video_rtp_senders_;
RtpBitrateConfigurator bitrate_configurator_;
std::map<std::string, rtc::NetworkRoute> network_routes_;
const std::unique_ptr<ProcessThread> process_thread_;
- PacedSender pacer_;
+ const bool use_task_queue_pacer_;
+ std::unique_ptr<PacedSender> process_thread_pacer_;
+ std::unique_ptr<TaskQueuePacedSender> task_queue_pacer_;
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);
diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index 5166cab..d59d2b9 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -26,6 +26,8 @@
"round_robin_packet_queue.cc",
"round_robin_packet_queue.h",
"rtp_packet_pacer.h",
+ "task_queue_paced_sender.cc",
+ "task_queue_paced_sender.h",
]
deps = [
@@ -33,6 +35,7 @@
"..:module_api",
"../../api:function_view",
"../../api/rtc_event_log",
+ "../../api/task_queue:task_queue",
"../../api/transport:field_trial_based_config",
"../../api/transport:network_control",
"../../api/transport:webrtc_key_value_config",
@@ -44,7 +47,10 @@
"../../logging:rtc_event_pacing",
"../../rtc_base:checks",
"../../rtc_base:rtc_base_approved",
+ "../../rtc_base:rtc_task_queue",
"../../rtc_base/experiments:field_trial_parser",
+ "../../rtc_base/synchronization:sequence_checker",
+ "../../rtc_base/task_utils:to_queued_task",
"../../system_wrappers",
"../../system_wrappers:metrics",
"../remote_bitrate_estimator",
@@ -78,6 +84,7 @@
"paced_sender_unittest.cc",
"pacing_controller_unittest.cc",
"packet_router_unittest.cc",
+ "task_queue_paced_sender_unittest.cc",
]
deps = [
":interval_budget",
@@ -93,6 +100,7 @@
"../../system_wrappers:field_trial",
"../../test:field_trial",
"../../test:test_support",
+ "../../test/time_controller:time_controller",
"../rtp_rtcp",
"../rtp_rtcp:mock_rtp_rtcp",
"../rtp_rtcp:rtp_rtcp_format",
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index 8be6209..0d0d1ae 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -241,6 +241,10 @@
return packet_queue_.Size();
}
+DataSize PacingController::CurrentBufferLevel() const {
+ return std::max(media_debt_, padding_debt_);
+}
+
absl::optional<Timestamp> PacingController::FirstSentPacketTime() const {
return first_sent_packet_time_;
}
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index 6a05eac..f39887d 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -111,9 +111,14 @@
// Returns the time since the oldest queued packet was enqueued.
TimeDelta OldestPacketWaitTime() const;
+ // Number of packets in the pacer queue.
size_t QueueSizePackets() const;
+ // Totals size of packets in the pacer queue.
DataSize QueueSizeData() const;
+ // Current buffer level, i.e. max of media and padding debt.
+ DataSize CurrentBufferLevel() const;
+
// Returns the time when the first packet was sent;
absl::optional<Timestamp> FirstSentPacketTime() const;
diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc
new file mode 100644
index 0000000..e1745db
--- /dev/null
+++ b/modules/pacing/task_queue_paced_sender.cc
@@ -0,0 +1,254 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "modules/pacing/task_queue_paced_sender.h"
+
+#include <algorithm>
+#include <utility>
+#include "absl/memory/memory.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/event.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+
+namespace webrtc {
+namespace {
+// If no calls to MaybeProcessPackets() happen, make sure we update stats
+// at least every |kMaxTimeBetweenStatsUpdates| as long as the pacer isn't
+// completely drained.
+constexpr TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis<33>();
+// Don't call UpdateStats() more than |kMinTimeBetweenStatsUpdates| apart,
+// for performance reasons.
+constexpr TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis<1>();
+} // namespace
+
+TaskQueuePacedSender::TaskQueuePacedSender(
+ Clock* clock,
+ PacketRouter* packet_router,
+ RtcEventLog* event_log,
+ const WebRtcKeyValueConfig* field_trials,
+ TaskQueueFactory* task_queue_factory)
+ : clock_(clock),
+ packet_router_(packet_router),
+ pacing_controller_(clock,
+ static_cast<PacingController::PacketSender*>(this),
+ event_log,
+ field_trials,
+ PacingController::ProcessMode::kDynamic),
+ next_process_time_(Timestamp::MinusInfinity()),
+ stats_update_scheduled_(false),
+ last_stats_time_(Timestamp::MinusInfinity()),
+ is_shutdown_(false),
+ task_queue_(task_queue_factory->CreateTaskQueue(
+ "TaskQueuePacedSender",
+ TaskQueueFactory::Priority::NORMAL)) {}
+
+TaskQueuePacedSender::~TaskQueuePacedSender() {
+ // Post an immediate task to mark the queue as shutting down.
+ // The rtc::TaskQueue destructor will wait for pending tasks to
+ // complete before continuing.
+ task_queue_.PostTask([&]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ is_shutdown_ = true;
+ });
+}
+
+void TaskQueuePacedSender::CreateProbeCluster(DataRate bitrate,
+ int cluster_id) {
+ task_queue_.PostTask([this, bitrate, cluster_id]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ pacing_controller_.CreateProbeCluster(bitrate, cluster_id);
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+ });
+}
+
+void TaskQueuePacedSender::Pause() {
+ task_queue_.PostTask([this]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ pacing_controller_.Pause();
+ });
+}
+
+void TaskQueuePacedSender::Resume() {
+ task_queue_.PostTask([this]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ pacing_controller_.Resume();
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+ });
+}
+
+void TaskQueuePacedSender::SetCongestionWindow(
+ DataSize congestion_window_size) {
+ task_queue_.PostTask([this, congestion_window_size]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ pacing_controller_.SetCongestionWindow(congestion_window_size);
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+ });
+}
+
+void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) {
+ if (task_queue_.IsCurrent()) {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ // Fast path since this can be called once per sent packet while on the
+ // task queue.
+ pacing_controller_.UpdateOutstandingData(outstanding_data);
+ return;
+ }
+
+ task_queue_.PostTask([this, outstanding_data]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ pacing_controller_.UpdateOutstandingData(outstanding_data);
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+ });
+}
+
+void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
+ DataRate padding_rate) {
+ task_queue_.PostTask([this, pacing_rate, padding_rate]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+ });
+}
+
+void TaskQueuePacedSender::EnqueuePackets(
+ std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
+ task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ for (auto& packet : packets_) {
+ pacing_controller_.EnqueuePacket(std::move(packet));
+ }
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+ });
+}
+
+void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
+ task_queue_.PostTask([this, account_for_audio]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ pacing_controller_.SetAccountForAudioPackets(account_for_audio);
+ });
+}
+
+void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
+ task_queue_.PostTask([this, limit]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ pacing_controller_.SetQueueTimeLimit(limit);
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+ });
+}
+
+TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const {
+ return GetStats().expected_queue_time;
+}
+
+DataSize TaskQueuePacedSender::QueueSizeData() const {
+ return GetStats().queue_size;
+}
+
+absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
+ return GetStats().first_sent_packet_time;
+}
+
+TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
+ return GetStats().oldest_packet_wait_time;
+}
+
+void TaskQueuePacedSender::MaybeProcessPackets(
+ Timestamp scheduled_process_time) {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+
+ if (is_shutdown_) {
+ return;
+ }
+
+ const Timestamp now = clock_->CurrentTime();
+ // Run ProcessPackets() only if this is the schedules task, or if there is
+ // no scheduled task and we need to process immediately.
+ if ((scheduled_process_time.IsFinite() &&
+ scheduled_process_time == next_process_time_) ||
+ (next_process_time_.IsInfinite() &&
+ pacing_controller_.NextSendTime() <= now)) {
+ pacing_controller_.ProcessPackets();
+ next_process_time_ = Timestamp::MinusInfinity();
+ }
+
+ Timestamp next_process_time = std::max(now + PacingController::kMinSleepTime,
+ pacing_controller_.NextSendTime());
+ TimeDelta sleep_time = next_process_time - now;
+ if (next_process_time_.IsMinusInfinity() ||
+ next_process_time <=
+ next_process_time_ - PacingController::kMinSleepTime) {
+ next_process_time_ = next_process_time;
+
+ task_queue_.PostDelayedTask(
+ [this, next_process_time]() { MaybeProcessPackets(next_process_time); },
+ sleep_time.ms<uint32_t>());
+ }
+
+ MaybeUpdateStats(false);
+}
+
+std::vector<std::unique_ptr<RtpPacketToSend>>
+TaskQueuePacedSender::GeneratePadding(DataSize size) {
+ return packet_router_->GeneratePadding(size.bytes());
+}
+
+void TaskQueuePacedSender::SendRtpPacket(
+ std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) {
+ packet_router_->SendPacket(std::move(packet), cluster_info);
+}
+
+void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) {
+ if (is_shutdown_) {
+ return;
+ }
+
+ Timestamp now = clock_->CurrentTime();
+ if (!is_scheduled_call &&
+ now - last_stats_time_ < kMinTimeBetweenStatsUpdates) {
+ // Too frequent unscheduled stats update, return early.
+ return;
+ }
+
+ rtc::CritScope cs(&stats_crit_);
+ current_stats_.expected_queue_time = pacing_controller_.ExpectedQueueTime();
+ current_stats_.first_sent_packet_time =
+ pacing_controller_.FirstSentPacketTime();
+ current_stats_.oldest_packet_wait_time =
+ pacing_controller_.OldestPacketWaitTime();
+ current_stats_.queue_size = pacing_controller_.QueueSizeData();
+ last_stats_time_ = now;
+
+ bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 &&
+ pacing_controller_.CurrentBufferLevel().IsZero();
+
+ // If there's anything interesting to get from the pacer and this is a
+ // scheduled call (no scheduled call in flight), post a new scheduled stats
+ // update.
+ if (!pacer_drained && (is_scheduled_call || !stats_update_scheduled_)) {
+ task_queue_.PostDelayedTask(
+ [this]() {
+ RTC_DCHECK_RUN_ON(&task_queue_);
+ MaybeUpdateStats(true);
+ },
+ kMaxTimeBetweenStatsUpdates.ms<uint32_t>());
+ stats_update_scheduled_ = true;
+ } else {
+ stats_update_scheduled_ = false;
+ }
+}
+
+TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
+ rtc::CritScope cs(&stats_crit_);
+ return current_stats_;
+}
+
+} // namespace webrtc
diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h
new file mode 100644
index 0000000..719886a
--- /dev/null
+++ b/modules/pacing/task_queue_paced_sender.h
@@ -0,0 +1,162 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_
+#define MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <functional>
+#include <memory>
+#include <queue>
+#include <vector>
+
+#include "absl/types/optional.h"
+#include "api/task_queue/task_queue_factory.h"
+#include "api/units/data_size.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
+#include "modules/include/module.h"
+#include "modules/pacing/pacing_controller.h"
+#include "modules/pacing/packet_router.h"
+#include "modules/pacing/rtp_packet_pacer.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
+#include "rtc_base/critical_section.h"
+#include "rtc_base/synchronization/sequence_checker.h"
+#include "rtc_base/task_queue.h"
+#include "rtc_base/thread_annotations.h"
+
+namespace webrtc {
+class Clock;
+class RtcEventLog;
+
+class TaskQueuePacedSender : public RtpPacketPacer,
+ public RtpPacketSender,
+ private PacingController::PacketSender {
+ public:
+ TaskQueuePacedSender(Clock* clock,
+ PacketRouter* packet_router,
+ RtcEventLog* event_log,
+ const WebRtcKeyValueConfig* field_trials,
+ TaskQueueFactory* task_queue_factory);
+
+ ~TaskQueuePacedSender() override;
+
+ // Methods implementing RtpPacketSender.
+
+ // Adds the packet to the queue and calls PacketRouter::SendPacket() when
+ // it's time to send.
+ void EnqueuePackets(
+ std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
+
+ // Methods implementing RtpPacketPacer:
+
+ void CreateProbeCluster(DataRate bitrate, int cluster_id) override;
+
+ // Temporarily pause all sending.
+ void Pause() override;
+
+ // Resume sending packets.
+ void Resume() override;
+
+ void SetCongestionWindow(DataSize congestion_window_size) override;
+ void UpdateOutstandingData(DataSize outstanding_data) override;
+
+ // Sets the pacing rates. Must be called once before packets can be sent.
+ void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;
+
+ // Currently audio traffic is not accounted for by pacer and passed through.
+ // With the introduction of audio BWE, audio traffic will be accounted for
+ // in the pacer budget calculation. The audio traffic will still be injected
+ // at high priority.
+ void SetAccountForAudioPackets(bool account_for_audio) override;
+
+ // Returns the time since the oldest queued packet was enqueued.
+ TimeDelta OldestPacketWaitTime() const override;
+
+ // Returns total size of all packets in the pacer queue.
+ DataSize QueueSizeData() const override;
+
+ // Returns the time when the first packet was sent;
+ absl::optional<Timestamp> FirstSentPacketTime() const override;
+
+ // Returns the number of milliseconds it will take to send the current
+ // packets in the queue, given the current size and bitrate, ignoring prio.
+ TimeDelta ExpectedQueueTime() const override;
+
+ // Set the max desired queuing delay, pacer will override the pacing rate
+ // specified by SetPacingRates() if needed to achieve this goal.
+ void SetQueueTimeLimit(TimeDelta limit) override;
+
+ private:
+ struct Stats {
+ Stats()
+ : oldest_packet_wait_time(TimeDelta::Zero()),
+ queue_size(DataSize::Zero()),
+ expected_queue_time(TimeDelta::Zero()) {}
+ TimeDelta oldest_packet_wait_time;
+ DataSize queue_size;
+ TimeDelta expected_queue_time;
+ absl::optional<Timestamp> first_sent_packet_time;
+ };
+
+ // Check if it is time to send packets, or schedule a delayed task if not.
+ // Use Timestamp::MinusInfinity() to indicate that this call has _not_
+ // been scheduled by the pacing controller. If this is the case, check if
+ // can execute immediately otherwise schedule a delay task that calls this
+ // method again with desired (finite) scheduled process time.
+ void MaybeProcessPackets(Timestamp scheduled_process_time);
+
+ // Methods implementing PacedSenderController:PacketSender.
+
+ void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) override
+ RTC_RUN_ON(task_queue_);
+
+ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
+ DataSize size) override RTC_RUN_ON(task_queue_);
+
+ void MaybeUpdateStats(bool is_scheduled_call) RTC_RUN_ON(task_queue_);
+ Stats GetStats() const;
+
+ Clock* const clock_;
+ PacketRouter* const packet_router_ RTC_GUARDED_BY(task_queue_);
+ PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_);
+
+ // We want only one (valid) delayed process task in flight at a time.
+ // If the value of |next_process_time_| is finite, it is an id for a
+ // delayed task that will call MaybeProcessPackets() with that time
+ // as parameter.
+ // Timestamp::MinusInfinity() indicates no valid pending task.
+ Timestamp next_process_time_ RTC_GUARDED_BY(task_queue_);
+
+ // Since we don't want to support synchronous calls that wait for a
+ // task execution, we poll the stats at some interval and update
+ // |current_stats_|, which can in turn be polled at any time.
+
+ // True iff there is delayed task in flight that that will call
+ // UdpateStats().
+ bool stats_update_scheduled_ RTC_GUARDED_BY(task_queue_);
+ // Last time stats were updated.
+ Timestamp last_stats_time_ RTC_GUARDED_BY(task_queue_);
+
+ // Indicates if this task queue is shutting down. If so, don't allow
+ // posting any more delayed tasks as that can cause the task queue to
+ // never drain.
+ bool is_shutdown_ RTC_GUARDED_BY(task_queue_);
+
+ rtc::CriticalSection stats_crit_;
+ Stats current_stats_ RTC_GUARDED_BY(stats_crit_);
+
+ rtc::TaskQueue task_queue_;
+};
+} // namespace webrtc
+#endif // MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_
diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc
new file mode 100644
index 0000000..390523f
--- /dev/null
+++ b/modules/pacing/task_queue_paced_sender_unittest.cc
@@ -0,0 +1,176 @@
+/*
+ * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "modules/pacing/task_queue_paced_sender.h"
+
+#include <list>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "modules/pacing/packet_router.h"
+#include "modules/utility/include/mock/mock_process_thread.h"
+#include "test/field_trial.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+#include "test/time_controller/simulated_time_controller.h"
+
+using ::testing::_;
+using ::testing::Return;
+using ::testing::SaveArg;
+
+namespace webrtc {
+namespace {
+constexpr uint32_t kAudioSsrc = 12345;
+constexpr uint32_t kVideoSsrc = 234565;
+constexpr uint32_t kVideoRtxSsrc = 34567;
+constexpr uint32_t kFlexFecSsrc = 45678;
+constexpr size_t kDefaultPacketSize = 1234;
+
+class MockPacketRouter : public PacketRouter {
+ public:
+ MOCK_METHOD2(SendPacket,
+ void(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info));
+ MOCK_METHOD1(
+ GeneratePadding,
+ std::vector<std::unique_ptr<RtpPacketToSend>>(size_t target_size_bytes));
+};
+} // namespace
+
+namespace test {
+
+class TaskQueuePacedSenderTest : public ::testing::Test {
+ public:
+ TaskQueuePacedSenderTest()
+ : time_controller_(Timestamp::ms(1234)),
+ pacer_(time_controller_.GetClock(),
+ &packet_router_,
+ /*event_log=*/nullptr,
+ /*field_trials=*/nullptr,
+ time_controller_.GetTaskQueueFactory()) {}
+
+ protected:
+ std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
+ auto packet = std::make_unique<RtpPacketToSend>(nullptr);
+ packet->set_packet_type(type);
+ switch (type) {
+ case RtpPacketToSend::Type::kAudio:
+ packet->SetSsrc(kAudioSsrc);
+ break;
+ case RtpPacketToSend::Type::kVideo:
+ packet->SetSsrc(kVideoSsrc);
+ break;
+ case RtpPacketToSend::Type::kRetransmission:
+ case RtpPacketToSend::Type::kPadding:
+ packet->SetSsrc(kVideoRtxSsrc);
+ break;
+ case RtpPacketToSend::Type::kForwardErrorCorrection:
+ packet->SetSsrc(kFlexFecSsrc);
+ break;
+ }
+
+ packet->SetPayloadSize(kDefaultPacketSize);
+ return packet;
+ }
+
+ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
+ RtpPacketToSend::Type type,
+ size_t num_packets) {
+ std::vector<std::unique_ptr<RtpPacketToSend>> packets;
+ for (size_t i = 0; i < num_packets; ++i) {
+ packets.push_back(BuildRtpPacket(type));
+ }
+ return packets;
+ }
+
+ Timestamp CurrentTime() { return time_controller_.GetClock()->CurrentTime(); }
+
+ GlobalSimulatedTimeController time_controller_;
+ MockPacketRouter packet_router_;
+ TaskQueuePacedSender pacer_;
+};
+
+TEST_F(TaskQueuePacedSenderTest, PacesPackets) {
+ // Insert a number of packets, covering one second.
+ static constexpr size_t kPacketsToSend = 42;
+ pacer_.SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend),
+ DataRate::Zero());
+ pacer_.EnqueuePackets(
+ GeneratePackets(RtpPacketToSend::Type::kVideo, kPacketsToSend));
+
+ // Expect all of them to be sent.
+ size_t packets_sent = 0;
+ Timestamp end_time = Timestamp::PlusInfinity();
+ EXPECT_CALL(packet_router_, SendPacket)
+ .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) {
+ ++packets_sent;
+ if (packets_sent == kPacketsToSend) {
+ end_time = time_controller_.GetClock()->CurrentTime();
+ }
+ });
+
+ const Timestamp start_time = time_controller_.GetClock()->CurrentTime();
+
+ // Packets should be sent over a period of close to 1s. Expect a little lower
+ // than this since initial probing is a bit quicker.
+ time_controller_.Sleep(TimeDelta::seconds(1));
+ EXPECT_EQ(packets_sent, kPacketsToSend);
+ ASSERT_TRUE(end_time.IsFinite());
+ EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
+}
+
+TEST_F(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
+ // Insert a number of packets to be sent 200ms apart.
+ const size_t kPacketsPerSecond = 5;
+ const DataRate kPacingRate =
+ DataRate::bps(kDefaultPacketSize * 8 * kPacketsPerSecond);
+ pacer_.SetPacingRates(kPacingRate, DataRate::Zero());
+
+ // Send some initial packets to be rid of any probes.
+ EXPECT_CALL(packet_router_, SendPacket).Times(kPacketsPerSecond);
+ pacer_.EnqueuePackets(
+ GeneratePackets(RtpPacketToSend::Type::kVideo, kPacketsPerSecond));
+ time_controller_.Sleep(TimeDelta::seconds(1));
+
+ // Insert three packets, and record send time of each of them.
+ // After the second packet is sent, double the send rate so we can
+ // check the third packets is sent after half the wait time.
+ Timestamp first_packet_time = Timestamp::MinusInfinity();
+ Timestamp second_packet_time = Timestamp::MinusInfinity();
+ Timestamp third_packet_time = Timestamp::MinusInfinity();
+
+ EXPECT_CALL(packet_router_, SendPacket)
+ .Times(3)
+ .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) {
+ if (first_packet_time.IsInfinite()) {
+ first_packet_time = CurrentTime();
+ } else if (second_packet_time.IsInfinite()) {
+ second_packet_time = CurrentTime();
+ pacer_.SetPacingRates(2 * kPacingRate, DataRate::Zero());
+ } else {
+ third_packet_time = CurrentTime();
+ }
+ });
+
+ pacer_.EnqueuePackets(GeneratePackets(RtpPacketToSend::Type::kVideo, 3));
+ time_controller_.Sleep(TimeDelta::ms(500));
+ ASSERT_TRUE(third_packet_time.IsFinite());
+ EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
+ 1.0);
+ EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
+ 1.0);
+}
+
+} // namespace test
+} // namespace webrtc