Implements a task-queue based PacedSender, wires it up for field trials

Bug: webrtc:10809
Change-Id: Ia181c16559f4598f32dd399c24802d0a289e250b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/150942
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29946}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index c7ccc92..a44b534 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -72,9 +72,24 @@
     const WebRtcKeyValueConfig* trials)
     : clock_(clock),
       event_log_(event_log),
+      field_trials_(trials ? trials : &fallback_field_trials_),
       bitrate_configurator_(bitrate_config),
       process_thread_(std::move(process_thread)),
-      pacer_(clock, &packet_router_, event_log, trials, process_thread_.get()),
+      use_task_queue_pacer_(IsEnabled(field_trials_, "WebRTC-TaskQueuePacer")),
+      process_thread_pacer_(use_task_queue_pacer_
+                                ? nullptr
+                                : new PacedSender(clock,
+                                                  &packet_router_,
+                                                  event_log,
+                                                  field_trials_,
+                                                  process_thread_.get())),
+      task_queue_pacer_(use_task_queue_pacer_
+                            ? new TaskQueuePacedSender(clock,
+                                                       &packet_router_,
+                                                       event_log,
+                                                       field_trials_,
+                                                       task_queue_factory)
+                            : nullptr),
       observer_(nullptr),
       controller_factory_override_(controller_factory),
       controller_factory_fallback_(
@@ -82,11 +97,12 @@
       process_interval_(controller_factory_fallback_->GetProcessInterval()),
       last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())),
       reset_feedback_on_route_change_(
-          !IsEnabled(trials, "WebRTC-Bwe-NoFeedbackReset")),
+          !IsEnabled(field_trials_, "WebRTC-Bwe-NoFeedbackReset")),
       send_side_bwe_with_overhead_(
-          IsEnabled(trials, "WebRTC-SendSideBwe-WithOverhead")),
+          IsEnabled(field_trials_, "WebRTC-SendSideBwe-WithOverhead")),
       add_pacing_to_cwin_(
-          IsEnabled(trials, "WebRTC-AddPacingToCongestionWindowPushback")),
+          IsEnabled(field_trials_,
+                    "WebRTC-AddPacingToCongestionWindowPushback")),
       transport_overhead_bytes_per_packet_(0),
       network_available_(false),
       retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
@@ -95,17 +111,21 @@
           TaskQueueFactory::Priority::NORMAL)) {
   initial_config_.constraints = ConvertConstraints(bitrate_config, clock_);
   initial_config_.event_log = event_log;
-  initial_config_.key_value_config = &trial_based_config_;
+  initial_config_.key_value_config = field_trials_;
   RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);
 
   pacer()->SetPacingRates(DataRate::bps(bitrate_config.start_bitrate_bps),
                           DataRate::Zero());
 
-  process_thread_->Start();
+  if (!use_task_queue_pacer_) {
+    process_thread_->Start();
+  }
 }
 
 RtpTransportControllerSend::~RtpTransportControllerSend() {
-  process_thread_->Stop();
+  if (!use_task_queue_pacer_) {
+    process_thread_->Stop();
+  }
 }
 
 RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
@@ -153,15 +173,17 @@
 }
 
 RtpPacketPacer* RtpTransportControllerSend::pacer() {
-  // TODO(bugs.webrtc.org/10809): Return reference to the correct
-  // pacer implementation.
-  return &pacer_;
+  if (use_task_queue_pacer_) {
+    return task_queue_pacer_.get();
+  }
+  return process_thread_pacer_.get();
 }
 
 const RtpPacketPacer* RtpTransportControllerSend::pacer() const {
-  // TODO(bugs.webrtc.org/10809): Return reference to the correct
-  // pacer implementation.
-  return &pacer_;
+  if (use_task_queue_pacer_) {
+    return task_queue_pacer_.get();
+  }
+  return process_thread_pacer_.get();
 }
 
 rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
@@ -183,9 +205,10 @@
 }
 
 RtpPacketSender* RtpTransportControllerSend::packet_sender() {
-  // TODO(bugs.webrtc.org/10809): Return reference to the correct
-  // pacer implementation.
-  return &pacer_;
+  if (use_task_queue_pacer_) {
+    return task_queue_pacer_.get();
+  }
+  return process_thread_pacer_.get();
 }
 
 void RtpTransportControllerSend::SetAllocatedSendBitrateLimits(
diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h
index 2cadaa3..32c762b 100644
--- a/call/rtp_transport_controller_send.h
+++ b/call/rtp_transport_controller_send.h
@@ -24,8 +24,10 @@
 #include "call/rtp_video_sender.h"
 #include "modules/congestion_controller/rtp/control_handler.h"
 #include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
+#include "modules/pacing/paced_sender.h"
 #include "modules/pacing/packet_router.h"
 #include "modules/pacing/rtp_packet_pacer.h"
+#include "modules/pacing/task_queue_paced_sender.h"
 #include "modules/utility/include/process_thread.h"
 #include "rtc_base/constructor_magic.h"
 #include "rtc_base/network_route.h"
@@ -137,13 +139,17 @@
 
   Clock* const clock_;
   RtcEventLog* const event_log_;
-  const FieldTrialBasedConfig trial_based_config_;
+  // TODO(sprang): Remove fallback field-trials.
+  const FieldTrialBasedConfig fallback_field_trials_;
+  const WebRtcKeyValueConfig* field_trials_;
   PacketRouter packet_router_;
   std::vector<std::unique_ptr<RtpVideoSenderInterface>> video_rtp_senders_;
   RtpBitrateConfigurator bitrate_configurator_;
   std::map<std::string, rtc::NetworkRoute> network_routes_;
   const std::unique_ptr<ProcessThread> process_thread_;
-  PacedSender pacer_;
+  const bool use_task_queue_pacer_;
+  std::unique_ptr<PacedSender> process_thread_pacer_;
+  std::unique_ptr<TaskQueuePacedSender> task_queue_pacer_;
 
   TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);
 
diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index 5166cab..d59d2b9 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -26,6 +26,8 @@
     "round_robin_packet_queue.cc",
     "round_robin_packet_queue.h",
     "rtp_packet_pacer.h",
+    "task_queue_paced_sender.cc",
+    "task_queue_paced_sender.h",
   ]
 
   deps = [
@@ -33,6 +35,7 @@
     "..:module_api",
     "../../api:function_view",
     "../../api/rtc_event_log",
+    "../../api/task_queue:task_queue",
     "../../api/transport:field_trial_based_config",
     "../../api/transport:network_control",
     "../../api/transport:webrtc_key_value_config",
@@ -44,7 +47,10 @@
     "../../logging:rtc_event_pacing",
     "../../rtc_base:checks",
     "../../rtc_base:rtc_base_approved",
+    "../../rtc_base:rtc_task_queue",
     "../../rtc_base/experiments:field_trial_parser",
+    "../../rtc_base/synchronization:sequence_checker",
+    "../../rtc_base/task_utils:to_queued_task",
     "../../system_wrappers",
     "../../system_wrappers:metrics",
     "../remote_bitrate_estimator",
@@ -78,6 +84,7 @@
       "paced_sender_unittest.cc",
       "pacing_controller_unittest.cc",
       "packet_router_unittest.cc",
+      "task_queue_paced_sender_unittest.cc",
     ]
     deps = [
       ":interval_budget",
@@ -93,6 +100,7 @@
       "../../system_wrappers:field_trial",
       "../../test:field_trial",
       "../../test:test_support",
+      "../../test/time_controller:time_controller",
       "../rtp_rtcp",
       "../rtp_rtcp:mock_rtp_rtcp",
       "../rtp_rtcp:rtp_rtcp_format",
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index 8be6209..0d0d1ae 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -241,6 +241,10 @@
   return packet_queue_.Size();
 }
 
+DataSize PacingController::CurrentBufferLevel() const {
+  return std::max(media_debt_, padding_debt_);
+}
+
 absl::optional<Timestamp> PacingController::FirstSentPacketTime() const {
   return first_sent_packet_time_;
 }
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index 6a05eac..f39887d 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -111,9 +111,14 @@
   // Returns the time since the oldest queued packet was enqueued.
   TimeDelta OldestPacketWaitTime() const;
 
+  // Number of packets in the pacer queue.
   size_t QueueSizePackets() const;
+  // Totals size of packets in the pacer queue.
   DataSize QueueSizeData() const;
 
+  // Current buffer level, i.e. max of media and padding debt.
+  DataSize CurrentBufferLevel() const;
+
   // Returns the time when the first packet was sent;
   absl::optional<Timestamp> FirstSentPacketTime() const;
 
diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc
new file mode 100644
index 0000000..e1745db
--- /dev/null
+++ b/modules/pacing/task_queue_paced_sender.cc
@@ -0,0 +1,254 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "modules/pacing/task_queue_paced_sender.h"
+
+#include <algorithm>
+#include <utility>
+#include "absl/memory/memory.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/event.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+
+namespace webrtc {
+namespace {
+// If no calls to MaybeProcessPackets() happen, make sure we update stats
+// at least every |kMaxTimeBetweenStatsUpdates| as long as the pacer isn't
+// completely drained.
+constexpr TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis<33>();
+// Don't call UpdateStats() more than |kMinTimeBetweenStatsUpdates| apart,
+// for performance reasons.
+constexpr TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis<1>();
+}  // namespace
+
+TaskQueuePacedSender::TaskQueuePacedSender(
+    Clock* clock,
+    PacketRouter* packet_router,
+    RtcEventLog* event_log,
+    const WebRtcKeyValueConfig* field_trials,
+    TaskQueueFactory* task_queue_factory)
+    : clock_(clock),
+      packet_router_(packet_router),
+      pacing_controller_(clock,
+                         static_cast<PacingController::PacketSender*>(this),
+                         event_log,
+                         field_trials,
+                         PacingController::ProcessMode::kDynamic),
+      next_process_time_(Timestamp::MinusInfinity()),
+      stats_update_scheduled_(false),
+      last_stats_time_(Timestamp::MinusInfinity()),
+      is_shutdown_(false),
+      task_queue_(task_queue_factory->CreateTaskQueue(
+          "TaskQueuePacedSender",
+          TaskQueueFactory::Priority::NORMAL)) {}
+
+TaskQueuePacedSender::~TaskQueuePacedSender() {
+  // Post an immediate task to mark the queue as shutting down.
+  // The rtc::TaskQueue destructor will wait for pending tasks to
+  // complete before continuing.
+  task_queue_.PostTask([&]() {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    is_shutdown_ = true;
+  });
+}
+
+void TaskQueuePacedSender::CreateProbeCluster(DataRate bitrate,
+                                              int cluster_id) {
+  task_queue_.PostTask([this, bitrate, cluster_id]() {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    pacing_controller_.CreateProbeCluster(bitrate, cluster_id);
+    MaybeProcessPackets(Timestamp::MinusInfinity());
+  });
+}
+
+void TaskQueuePacedSender::Pause() {
+  task_queue_.PostTask([this]() {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    pacing_controller_.Pause();
+  });
+}
+
+void TaskQueuePacedSender::Resume() {
+  task_queue_.PostTask([this]() {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    pacing_controller_.Resume();
+    MaybeProcessPackets(Timestamp::MinusInfinity());
+  });
+}
+
+void TaskQueuePacedSender::SetCongestionWindow(
+    DataSize congestion_window_size) {
+  task_queue_.PostTask([this, congestion_window_size]() {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    pacing_controller_.SetCongestionWindow(congestion_window_size);
+    MaybeProcessPackets(Timestamp::MinusInfinity());
+  });
+}
+
+void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) {
+  if (task_queue_.IsCurrent()) {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    // Fast path since this can be called once per sent packet while on the
+    // task queue.
+    pacing_controller_.UpdateOutstandingData(outstanding_data);
+    return;
+  }
+
+  task_queue_.PostTask([this, outstanding_data]() {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    pacing_controller_.UpdateOutstandingData(outstanding_data);
+    MaybeProcessPackets(Timestamp::MinusInfinity());
+  });
+}
+
+void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
+                                          DataRate padding_rate) {
+  task_queue_.PostTask([this, pacing_rate, padding_rate]() {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
+    MaybeProcessPackets(Timestamp::MinusInfinity());
+  });
+}
+
+void TaskQueuePacedSender::EnqueuePackets(
+    std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
+  task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    for (auto& packet : packets_) {
+      pacing_controller_.EnqueuePacket(std::move(packet));
+    }
+    MaybeProcessPackets(Timestamp::MinusInfinity());
+  });
+}
+
+void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
+  task_queue_.PostTask([this, account_for_audio]() {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    pacing_controller_.SetAccountForAudioPackets(account_for_audio);
+  });
+}
+
+void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
+  task_queue_.PostTask([this, limit]() {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    pacing_controller_.SetQueueTimeLimit(limit);
+    MaybeProcessPackets(Timestamp::MinusInfinity());
+  });
+}
+
+TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const {
+  return GetStats().expected_queue_time;
+}
+
+DataSize TaskQueuePacedSender::QueueSizeData() const {
+  return GetStats().queue_size;
+}
+
+absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
+  return GetStats().first_sent_packet_time;
+}
+
+TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
+  return GetStats().oldest_packet_wait_time;
+}
+
+void TaskQueuePacedSender::MaybeProcessPackets(
+    Timestamp scheduled_process_time) {
+  RTC_DCHECK_RUN_ON(&task_queue_);
+
+  if (is_shutdown_) {
+    return;
+  }
+
+  const Timestamp now = clock_->CurrentTime();
+  // Run ProcessPackets() only if this is the schedules task, or if there is
+  // no scheduled task and we need to process immediately.
+  if ((scheduled_process_time.IsFinite() &&
+       scheduled_process_time == next_process_time_) ||
+      (next_process_time_.IsInfinite() &&
+       pacing_controller_.NextSendTime() <= now)) {
+    pacing_controller_.ProcessPackets();
+    next_process_time_ = Timestamp::MinusInfinity();
+  }
+
+  Timestamp next_process_time = std::max(now + PacingController::kMinSleepTime,
+                                         pacing_controller_.NextSendTime());
+  TimeDelta sleep_time = next_process_time - now;
+  if (next_process_time_.IsMinusInfinity() ||
+      next_process_time <=
+          next_process_time_ - PacingController::kMinSleepTime) {
+    next_process_time_ = next_process_time;
+
+    task_queue_.PostDelayedTask(
+        [this, next_process_time]() { MaybeProcessPackets(next_process_time); },
+        sleep_time.ms<uint32_t>());
+  }
+
+  MaybeUpdateStats(false);
+}
+
+std::vector<std::unique_ptr<RtpPacketToSend>>
+TaskQueuePacedSender::GeneratePadding(DataSize size) {
+  return packet_router_->GeneratePadding(size.bytes());
+}
+
+void TaskQueuePacedSender::SendRtpPacket(
+    std::unique_ptr<RtpPacketToSend> packet,
+    const PacedPacketInfo& cluster_info) {
+  packet_router_->SendPacket(std::move(packet), cluster_info);
+}
+
+void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) {
+  if (is_shutdown_) {
+    return;
+  }
+
+  Timestamp now = clock_->CurrentTime();
+  if (!is_scheduled_call &&
+      now - last_stats_time_ < kMinTimeBetweenStatsUpdates) {
+    // Too frequent unscheduled stats update, return early.
+    return;
+  }
+
+  rtc::CritScope cs(&stats_crit_);
+  current_stats_.expected_queue_time = pacing_controller_.ExpectedQueueTime();
+  current_stats_.first_sent_packet_time =
+      pacing_controller_.FirstSentPacketTime();
+  current_stats_.oldest_packet_wait_time =
+      pacing_controller_.OldestPacketWaitTime();
+  current_stats_.queue_size = pacing_controller_.QueueSizeData();
+  last_stats_time_ = now;
+
+  bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 &&
+                       pacing_controller_.CurrentBufferLevel().IsZero();
+
+  // If there's anything interesting to get from the pacer and this is a
+  // scheduled call (no scheduled call in flight), post a new scheduled stats
+  // update.
+  if (!pacer_drained && (is_scheduled_call || !stats_update_scheduled_)) {
+    task_queue_.PostDelayedTask(
+        [this]() {
+          RTC_DCHECK_RUN_ON(&task_queue_);
+          MaybeUpdateStats(true);
+        },
+        kMaxTimeBetweenStatsUpdates.ms<uint32_t>());
+    stats_update_scheduled_ = true;
+  } else {
+    stats_update_scheduled_ = false;
+  }
+}
+
+TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
+  rtc::CritScope cs(&stats_crit_);
+  return current_stats_;
+}
+
+}  // namespace webrtc
diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h
new file mode 100644
index 0000000..719886a
--- /dev/null
+++ b/modules/pacing/task_queue_paced_sender.h
@@ -0,0 +1,162 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_
+#define MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <functional>
+#include <memory>
+#include <queue>
+#include <vector>
+
+#include "absl/types/optional.h"
+#include "api/task_queue/task_queue_factory.h"
+#include "api/units/data_size.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
+#include "modules/include/module.h"
+#include "modules/pacing/pacing_controller.h"
+#include "modules/pacing/packet_router.h"
+#include "modules/pacing/rtp_packet_pacer.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
+#include "rtc_base/critical_section.h"
+#include "rtc_base/synchronization/sequence_checker.h"
+#include "rtc_base/task_queue.h"
+#include "rtc_base/thread_annotations.h"
+
+namespace webrtc {
+class Clock;
+class RtcEventLog;
+
+class TaskQueuePacedSender : public RtpPacketPacer,
+                             public RtpPacketSender,
+                             private PacingController::PacketSender {
+ public:
+  TaskQueuePacedSender(Clock* clock,
+                       PacketRouter* packet_router,
+                       RtcEventLog* event_log,
+                       const WebRtcKeyValueConfig* field_trials,
+                       TaskQueueFactory* task_queue_factory);
+
+  ~TaskQueuePacedSender() override;
+
+  // Methods implementing RtpPacketSender.
+
+  // Adds the packet to the queue and calls PacketRouter::SendPacket() when
+  // it's time to send.
+  void EnqueuePackets(
+      std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
+
+  // Methods implementing RtpPacketPacer:
+
+  void CreateProbeCluster(DataRate bitrate, int cluster_id) override;
+
+  // Temporarily pause all sending.
+  void Pause() override;
+
+  // Resume sending packets.
+  void Resume() override;
+
+  void SetCongestionWindow(DataSize congestion_window_size) override;
+  void UpdateOutstandingData(DataSize outstanding_data) override;
+
+  // Sets the pacing rates. Must be called once before packets can be sent.
+  void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;
+
+  // Currently audio traffic is not accounted for by pacer and passed through.
+  // With the introduction of audio BWE, audio traffic will be accounted for
+  // in the pacer budget calculation. The audio traffic will still be injected
+  // at high priority.
+  void SetAccountForAudioPackets(bool account_for_audio) override;
+
+  // Returns the time since the oldest queued packet was enqueued.
+  TimeDelta OldestPacketWaitTime() const override;
+
+  // Returns total size of all packets in the pacer queue.
+  DataSize QueueSizeData() const override;
+
+  // Returns the time when the first packet was sent;
+  absl::optional<Timestamp> FirstSentPacketTime() const override;
+
+  // Returns the number of milliseconds it will take to send the current
+  // packets in the queue, given the current size and bitrate, ignoring prio.
+  TimeDelta ExpectedQueueTime() const override;
+
+  // Set the max desired queuing delay, pacer will override the pacing rate
+  // specified by SetPacingRates() if needed to achieve this goal.
+  void SetQueueTimeLimit(TimeDelta limit) override;
+
+ private:
+  struct Stats {
+    Stats()
+        : oldest_packet_wait_time(TimeDelta::Zero()),
+          queue_size(DataSize::Zero()),
+          expected_queue_time(TimeDelta::Zero()) {}
+    TimeDelta oldest_packet_wait_time;
+    DataSize queue_size;
+    TimeDelta expected_queue_time;
+    absl::optional<Timestamp> first_sent_packet_time;
+  };
+
+  // Check if it is time to send packets, or schedule a delayed task if not.
+  // Use Timestamp::MinusInfinity() to indicate that this call has _not_
+  // been scheduled by the pacing controller. If this is the case, check if
+  // can execute immediately otherwise schedule a delay task that calls this
+  // method again with desired (finite) scheduled process time.
+  void MaybeProcessPackets(Timestamp scheduled_process_time);
+
+  // Methods implementing PacedSenderController:PacketSender.
+
+  void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
+                     const PacedPacketInfo& cluster_info) override
+      RTC_RUN_ON(task_queue_);
+
+  std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
+      DataSize size) override RTC_RUN_ON(task_queue_);
+
+  void MaybeUpdateStats(bool is_scheduled_call) RTC_RUN_ON(task_queue_);
+  Stats GetStats() const;
+
+  Clock* const clock_;
+  PacketRouter* const packet_router_ RTC_GUARDED_BY(task_queue_);
+  PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_);
+
+  // We want only one (valid) delayed process task in flight at a time.
+  // If the value of |next_process_time_| is finite, it is an id for a
+  // delayed task that will call MaybeProcessPackets() with that time
+  // as parameter.
+  // Timestamp::MinusInfinity() indicates no valid pending task.
+  Timestamp next_process_time_ RTC_GUARDED_BY(task_queue_);
+
+  // Since we don't want to support synchronous calls that wait for a
+  // task execution, we poll the stats at some interval and update
+  // |current_stats_|, which can in turn be polled at any time.
+
+  // True iff there is delayed task in flight that that will call
+  // UdpateStats().
+  bool stats_update_scheduled_ RTC_GUARDED_BY(task_queue_);
+  // Last time stats were updated.
+  Timestamp last_stats_time_ RTC_GUARDED_BY(task_queue_);
+
+  // Indicates if this task queue is shutting down. If so, don't allow
+  // posting any more delayed tasks as that can cause the task queue to
+  // never drain.
+  bool is_shutdown_ RTC_GUARDED_BY(task_queue_);
+
+  rtc::CriticalSection stats_crit_;
+  Stats current_stats_ RTC_GUARDED_BY(stats_crit_);
+
+  rtc::TaskQueue task_queue_;
+};
+}  // namespace webrtc
+#endif  // MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_
diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc
new file mode 100644
index 0000000..390523f
--- /dev/null
+++ b/modules/pacing/task_queue_paced_sender_unittest.cc
@@ -0,0 +1,176 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "modules/pacing/task_queue_paced_sender.h"
+
+#include <list>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "modules/pacing/packet_router.h"
+#include "modules/utility/include/mock/mock_process_thread.h"
+#include "test/field_trial.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+#include "test/time_controller/simulated_time_controller.h"
+
+using ::testing::_;
+using ::testing::Return;
+using ::testing::SaveArg;
+
+namespace webrtc {
+namespace {
+constexpr uint32_t kAudioSsrc = 12345;
+constexpr uint32_t kVideoSsrc = 234565;
+constexpr uint32_t kVideoRtxSsrc = 34567;
+constexpr uint32_t kFlexFecSsrc = 45678;
+constexpr size_t kDefaultPacketSize = 1234;
+
+class MockPacketRouter : public PacketRouter {
+ public:
+  MOCK_METHOD2(SendPacket,
+               void(std::unique_ptr<RtpPacketToSend> packet,
+                    const PacedPacketInfo& cluster_info));
+  MOCK_METHOD1(
+      GeneratePadding,
+      std::vector<std::unique_ptr<RtpPacketToSend>>(size_t target_size_bytes));
+};
+}  // namespace
+
+namespace test {
+
+class TaskQueuePacedSenderTest : public ::testing::Test {
+ public:
+  TaskQueuePacedSenderTest()
+      : time_controller_(Timestamp::ms(1234)),
+        pacer_(time_controller_.GetClock(),
+               &packet_router_,
+               /*event_log=*/nullptr,
+               /*field_trials=*/nullptr,
+               time_controller_.GetTaskQueueFactory()) {}
+
+ protected:
+  std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
+    auto packet = std::make_unique<RtpPacketToSend>(nullptr);
+    packet->set_packet_type(type);
+    switch (type) {
+      case RtpPacketToSend::Type::kAudio:
+        packet->SetSsrc(kAudioSsrc);
+        break;
+      case RtpPacketToSend::Type::kVideo:
+        packet->SetSsrc(kVideoSsrc);
+        break;
+      case RtpPacketToSend::Type::kRetransmission:
+      case RtpPacketToSend::Type::kPadding:
+        packet->SetSsrc(kVideoRtxSsrc);
+        break;
+      case RtpPacketToSend::Type::kForwardErrorCorrection:
+        packet->SetSsrc(kFlexFecSsrc);
+        break;
+    }
+
+    packet->SetPayloadSize(kDefaultPacketSize);
+    return packet;
+  }
+
+  std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
+      RtpPacketToSend::Type type,
+      size_t num_packets) {
+    std::vector<std::unique_ptr<RtpPacketToSend>> packets;
+    for (size_t i = 0; i < num_packets; ++i) {
+      packets.push_back(BuildRtpPacket(type));
+    }
+    return packets;
+  }
+
+  Timestamp CurrentTime() { return time_controller_.GetClock()->CurrentTime(); }
+
+  GlobalSimulatedTimeController time_controller_;
+  MockPacketRouter packet_router_;
+  TaskQueuePacedSender pacer_;
+};
+
+TEST_F(TaskQueuePacedSenderTest, PacesPackets) {
+  // Insert a number of packets, covering one second.
+  static constexpr size_t kPacketsToSend = 42;
+  pacer_.SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend),
+                        DataRate::Zero());
+  pacer_.EnqueuePackets(
+      GeneratePackets(RtpPacketToSend::Type::kVideo, kPacketsToSend));
+
+  // Expect all of them to be sent.
+  size_t packets_sent = 0;
+  Timestamp end_time = Timestamp::PlusInfinity();
+  EXPECT_CALL(packet_router_, SendPacket)
+      .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
+                          const PacedPacketInfo& cluster_info) {
+        ++packets_sent;
+        if (packets_sent == kPacketsToSend) {
+          end_time = time_controller_.GetClock()->CurrentTime();
+        }
+      });
+
+  const Timestamp start_time = time_controller_.GetClock()->CurrentTime();
+
+  // Packets should be sent over a period of close to 1s. Expect a little lower
+  // than this since initial probing is a bit quicker.
+  time_controller_.Sleep(TimeDelta::seconds(1));
+  EXPECT_EQ(packets_sent, kPacketsToSend);
+  ASSERT_TRUE(end_time.IsFinite());
+  EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
+}
+
+TEST_F(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
+  // Insert a number of packets to be sent 200ms apart.
+  const size_t kPacketsPerSecond = 5;
+  const DataRate kPacingRate =
+      DataRate::bps(kDefaultPacketSize * 8 * kPacketsPerSecond);
+  pacer_.SetPacingRates(kPacingRate, DataRate::Zero());
+
+  // Send some initial packets to be rid of any probes.
+  EXPECT_CALL(packet_router_, SendPacket).Times(kPacketsPerSecond);
+  pacer_.EnqueuePackets(
+      GeneratePackets(RtpPacketToSend::Type::kVideo, kPacketsPerSecond));
+  time_controller_.Sleep(TimeDelta::seconds(1));
+
+  // Insert three packets, and record send time of each of them.
+  // After the second packet is sent, double the send rate so we can
+  // check the third packets is sent after half the wait time.
+  Timestamp first_packet_time = Timestamp::MinusInfinity();
+  Timestamp second_packet_time = Timestamp::MinusInfinity();
+  Timestamp third_packet_time = Timestamp::MinusInfinity();
+
+  EXPECT_CALL(packet_router_, SendPacket)
+      .Times(3)
+      .WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
+                          const PacedPacketInfo& cluster_info) {
+        if (first_packet_time.IsInfinite()) {
+          first_packet_time = CurrentTime();
+        } else if (second_packet_time.IsInfinite()) {
+          second_packet_time = CurrentTime();
+          pacer_.SetPacingRates(2 * kPacingRate, DataRate::Zero());
+        } else {
+          third_packet_time = CurrentTime();
+        }
+      });
+
+  pacer_.EnqueuePackets(GeneratePackets(RtpPacketToSend::Type::kVideo, 3));
+  time_controller_.Sleep(TimeDelta::ms(500));
+  ASSERT_TRUE(third_packet_time.IsFinite());
+  EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
+              1.0);
+  EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
+              1.0);
+}
+
+}  // namespace test
+}  // namespace webrtc