Add support within PacedSender and pacer queue for owning rtp packets.

This CL builds on https://webrtc-review.googlesource.com/c/src/+/142165
It adds the parts within the paced sender that uses those send methods.
A follow-up will add the pre-pacer RTP sender parts. That CL will also
add proper integration testing. Here, I mostly add coverage for the new
send methods. When the old code-path is removed, all tests need to be
converted to exclusively use the owned path.

Bug: webrtc:10633
Change-Id: I870d9a2285f07a7b7b0ef6758aa310808f210f28
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/142179
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28308}
diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index a35ba85..9c9f7d9 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -85,6 +85,7 @@
       "../rtp_rtcp",
       "../rtp_rtcp:mock_rtp_rtcp",
       "../rtp_rtcp:rtp_rtcp_format",
+      "//third_party/abseil-cpp/absl/memory",
     ]
   }
 
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index 3e36f14..6177ca6 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -44,6 +44,27 @@
   return field_trials.Lookup(key).find("Enabled") == 0;
 }
 
+int GetPriorityForType(RtpPacketToSend::Type type) {
+  switch (type) {
+    case RtpPacketToSend::Type::kAudio:
+      // Audio is always prioritized over other packet types.
+      return 0;
+    case RtpPacketToSend::Type::kRetransmission:
+      // Send retransmissions before new media.
+      return 1;
+    case RtpPacketToSend::Type::kVideo:
+      // Video has "normal" priority, in the old speak.
+      return 2;
+    case RtpPacketToSend::Type::kForwardErrorCorrection:
+      // Redundancy is OK to drop, but the content is hopefully not useless.
+      return 3;
+    case RtpPacketToSend::Type::kPadding:
+      // Packets that are in themselves likely useless, only sent to keep the
+      // BWE high.
+      return 4;
+  }
+}
+
 }  // namespace
 const int64_t PacedSender::kMaxQueueLengthMs = 2000;
 const float PacedSender::kDefaultPaceMultiplier = 2.5f;
@@ -186,9 +207,37 @@
   if (capture_time_ms < 0)
     capture_time_ms = now_ms;
 
-  packets_.Push(RoundRobinPacketQueue::Packet(
-      priority, ssrc, sequence_number, capture_time_ms, now_ms, bytes,
-      retransmission, packet_counter_++));
+  RtpPacketToSend::Type type;
+  switch (priority) {
+    case RtpPacketPacer::kHighPriority:
+      type = RtpPacketToSend::Type::kAudio;
+      break;
+    case RtpPacketPacer::kNormalPriority:
+      type = RtpPacketToSend::Type::kRetransmission;
+      break;
+    default:
+      type = RtpPacketToSend::Type::kVideo;
+  }
+  packets_.Push(GetPriorityForType(type), type, ssrc, sequence_number,
+                capture_time_ms, now_ms, bytes, retransmission,
+                packet_counter_++);
+}
+
+void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
+  rtc::CritScope cs(&critsect_);
+  RTC_DCHECK(pacing_bitrate_kbps_ > 0)
+      << "SetPacingRate must be called before InsertPacket.";
+
+  int64_t now_ms = TimeMilliseconds();
+  prober_.OnIncomingPacket(packet->payload_size());
+
+  if (packet->capture_time_ms() < 0) {
+    packet->set_capture_time_ms(now_ms);
+  }
+
+  RTC_CHECK(packet->packet_type());
+  int priority = GetPriorityForType(*packet->packet_type());
+  packets_.Push(priority, now_ms, packet_counter_++, std::move(packet));
 }
 
 void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
@@ -324,27 +373,43 @@
   // The paused state is checked in the loop since it leaves the critical
   // section allowing the paused state to be changed from other code.
   while (!packets_.Empty() && !paused_) {
-    const auto* packet = GetPendingPacket(pacing_info);
+    auto* packet = GetPendingPacket(pacing_info);
     if (packet == nullptr)
       break;
 
+    std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
+    const bool owned_rtp_packet = rtp_packet != nullptr;
+
     critsect_.Leave();
-    RtpPacketSendResult success = packet_router_->TimeToSendPacket(
-        packet->ssrc, packet->sequence_number, packet->capture_time_ms,
-        packet->retransmission, pacing_info);
+
+    RtpPacketSendResult success;
+    if (rtp_packet != nullptr) {
+      packet_router_->SendPacket(std::move(rtp_packet), pacing_info);
+      success = RtpPacketSendResult::kSuccess;
+    } else {
+      success = packet_router_->TimeToSendPacket(
+          packet->ssrc(), packet->sequence_number(), packet->capture_time_ms(),
+          packet->is_retransmission(), pacing_info);
+    }
+
     critsect_.Enter();
     if (success == RtpPacketSendResult::kSuccess ||
         success == RtpPacketSendResult::kPacketNotFound) {
       // Packet sent or invalid packet, remove it from queue.
       // TODO(webrtc:8052): Don't consume media budget on kInvalid.
-      bytes_sent += packet->bytes;
+      bytes_sent += packet->size_in_bytes();
       // Send succeeded, remove it from the queue.
       OnPacketSent(packet);
       if (is_probing && bytes_sent > recommended_probe_size)
         break;
+    } else if (owned_rtp_packet) {
+      // Send failed, but we can't put it back in the queue, remove it without
+      // consuming budget.
+      packets_.FinalizePop();
+      break;
     } else {
       // Send failed, put it back into the queue.
-      packets_.CancelPop(*packet);
+      packets_.CancelPop();
       break;
     }
   }
@@ -379,34 +444,34 @@
   process_thread_ = process_thread;
 }
 
-const RoundRobinPacketQueue::Packet* PacedSender::GetPendingPacket(
+RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket(
     const PacedPacketInfo& pacing_info) {
   // Since we need to release the lock in order to send, we first pop the
   // element from the priority queue but keep it in storage, so that we can
   // reinsert it if send fails.
-  const RoundRobinPacketQueue::Packet* packet = &packets_.BeginPop();
-  bool audio_packet = packet->priority == kHighPriority;
+  RoundRobinPacketQueue::QueuedPacket* packet = packets_.BeginPop();
+  bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
   bool apply_pacing = !audio_packet || pace_audio_;
   if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
                                        pacing_info.probe_cluster_id ==
                                            PacedPacketInfo::kNotAProbe))) {
-    packets_.CancelPop(*packet);
+    packets_.CancelPop();
     return nullptr;
   }
   return packet;
 }
 
-void PacedSender::OnPacketSent(const RoundRobinPacketQueue::Packet* packet) {
+void PacedSender::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) {
   if (first_sent_packet_ms_ == -1)
     first_sent_packet_ms_ = TimeMilliseconds();
-  bool audio_packet = packet->priority == kHighPriority;
+  bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
   if (!audio_packet || account_for_audio_) {
     // Update media bytes sent.
-    UpdateBudgetWithBytesSent(packet->bytes);
+    UpdateBudgetWithBytesSent(packet->size_in_bytes());
     last_send_time_us_ = clock_->TimeInMicroseconds();
   }
   // Send succeeded, remove it from the queue.
-  packets_.FinalizePop(*packet);
+  packets_.FinalizePop();
 }
 
 void PacedSender::OnPaddingSent(size_t bytes_sent) {
diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h
index eb98ca2..c67e162 100644
--- a/modules/pacing/paced_sender.h
+++ b/modules/pacing/paced_sender.h
@@ -25,7 +25,9 @@
 #include "modules/pacing/interval_budget.h"
 #include "modules/pacing/packet_router.h"
 #include "modules/pacing/round_robin_packet_queue.h"
+#include "modules/rtp_rtcp/include/rtp_packet_pacer.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
 #include "modules/utility/include/process_thread.h"
 #include "rtc_base/critical_section.h"
 #include "rtc_base/experiments/field_trial_parser.h"
@@ -35,7 +37,7 @@
 class Clock;
 class RtcEventLog;
 
-class PacedSender : public Module, public RtpPacketSender {
+class PacedSender : public Module, public RtpPacketPacer {
  public:
   static constexpr int64_t kNoCongestionWindow = -1;
 
@@ -77,8 +79,8 @@
   // Sets the pacing rates. Must be called once before packets can be sent.
   void SetPacingRates(uint32_t pacing_rate_bps, uint32_t padding_rate_bps);
 
-  // Returns true if we send the packet now, else it will add the packet
-  // information to the queue and call TimeToSendPacket when it's time to send.
+  // Adds the packet information to the queue and calls TimeToSendPacket
+  // when it's time to send.
   void InsertPacket(RtpPacketSender::Priority priority,
                     uint32_t ssrc,
                     uint16_t sequence_number,
@@ -86,6 +88,10 @@
                     size_t bytes,
                     bool retransmission) override;
 
+  // Adds the packet to the queue and calls PacketRouter::SendPacket() when
+  // it's time to send.
+  void EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) override;
+
   // Currently audio traffic is not accounted by pacer and passed through.
   // With the introduction of audio BWE audio traffic will be accounted for
   // the pacer budget calculation. The audio traffic still will be injected
@@ -129,10 +135,10 @@
   void UpdateBudgetWithBytesSent(size_t bytes)
       RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
 
-  const RoundRobinPacketQueue::Packet* GetPendingPacket(
+  RoundRobinPacketQueue::QueuedPacket* GetPendingPacket(
       const PacedPacketInfo& pacing_info)
       RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
-  void OnPacketSent(const RoundRobinPacketQueue::Packet* packet)
+  void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet)
       RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
   void OnPaddingSent(size_t padding_sent)
       RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc
index d991d61..d630980 100644
--- a/modules/pacing/paced_sender_unittest.cc
+++ b/modules/pacing/paced_sender_unittest.cc
@@ -12,6 +12,7 @@
 #include <memory>
 #include <string>
 
+#include "absl/memory/memory.h"
 #include "modules/pacing/paced_sender.h"
 #include "modules/pacing/packet_router.h"
 #include "system_wrappers/include/clock.h"
@@ -22,6 +23,8 @@
 
 using ::testing::_;
 using ::testing::Field;
+using ::testing::Pointee;
+using ::testing::Property;
 using ::testing::Return;
 
 namespace {
@@ -34,6 +37,11 @@
 constexpr int kBitrateProbingError = 150000;
 
 const float kPaceMultiplier = 2.5f;
+
+constexpr uint32_t kAudioSsrc = 12345;
+constexpr uint32_t kVideoSsrc = 234565;
+constexpr uint32_t kVideoRtxSsrc = 34567;
+constexpr uint32_t kFlexFecSsrc = 45678;
 }  // namespace
 
 namespace webrtc {
@@ -49,6 +57,9 @@
                                    int64_t capture_time_ms,
                                    bool retransmission,
                                    const PacedPacketInfo& pacing_info));
+  MOCK_METHOD2(SendPacket,
+               void(std::unique_ptr<RtpPacketToSend> packet,
+                    const PacedPacketInfo& pacing_info));
   MOCK_METHOD2(TimeToSendPadding,
                size_t(size_t bytes, const PacedPacketInfo& pacing_info));
 };
@@ -139,6 +150,30 @@
         .Times(1)
         .WillRepeatedly(Return(RtpPacketSendResult::kSuccess));
   }
+
+  std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
+    auto packet = absl::make_unique<RtpPacketToSend>(nullptr);
+    packet->set_packet_type(type);
+    switch (type) {
+      case RtpPacketToSend::Type::kAudio:
+        packet->SetSsrc(kAudioSsrc);
+        break;
+      case RtpPacketToSend::Type::kVideo:
+        packet->SetSsrc(kVideoSsrc);
+        break;
+      case RtpPacketToSend::Type::kRetransmission:
+      case RtpPacketToSend::Type::kPadding:
+        packet->SetSsrc(kVideoRtxSsrc);
+        break;
+      case RtpPacketToSend::Type::kForwardErrorCorrection:
+        packet->SetSsrc(kFlexFecSsrc);
+        break;
+    }
+
+    packet->SetPayloadSize(234);
+    return packet;
+  }
+
   SimulatedClock clock_;
   MockPacedSenderCallback callback_;
   std::unique_ptr<PacedSender> send_bucket_;
@@ -1292,6 +1327,39 @@
   EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
 }
 
+TEST_F(PacedSenderTest, OwnedPacketPrioritizedOnType) {
+  // Insert a packet of each type, from low to high priority. Since priority
+  // is weighted higher than insert order, these should come out of the pacer
+  // in backwards order.
+  for (RtpPacketToSend::Type type :
+       {RtpPacketToSend::Type::kPadding,
+        RtpPacketToSend::Type::kForwardErrorCorrection,
+        RtpPacketToSend::Type::kVideo, RtpPacketToSend::Type::kRetransmission,
+        RtpPacketToSend::Type::kAudio}) {
+    send_bucket_->EnqueuePacket(BuildRtpPacket(type));
+  }
+
+  ::testing::InSequence seq;
+  EXPECT_CALL(
+      callback_,
+      SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kAudioSsrc)), _));
+  EXPECT_CALL(
+      callback_,
+      SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
+  EXPECT_CALL(
+      callback_,
+      SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoSsrc)), _));
+  EXPECT_CALL(
+      callback_,
+      SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kFlexFecSsrc)), _));
+  EXPECT_CALL(
+      callback_,
+      SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
+
+  clock_.AdvanceTimeMilliseconds(200);
+  send_bucket_->Process();
+}
+
 // TODO(philipel): Move to PacketQueue2 unittests.
 #if 0
 TEST_F(PacedSenderTest, QueueTimeWithPause) {
diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc
index a7c2b93..6d2c7ff 100644
--- a/modules/pacing/packet_router.cc
+++ b/modules/pacing/packet_router.cc
@@ -13,6 +13,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <limits>
+#include <utility>
 
 #include "absl/types/optional.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp.h"
@@ -20,6 +21,7 @@
 #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
 #include "rtc_base/atomic_ops.h"
 #include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
 #include "rtc_base/time_utils.h"
 
 namespace webrtc {
@@ -125,6 +127,29 @@
   return RtpPacketSendResult::kPacketNotFound;
 }
 
+void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
+                              const PacedPacketInfo& cluster_info) {
+  rtc::CritScope cs(&modules_crit_);
+  for (auto* rtp_module : rtp_send_modules_) {
+    if (rtp_module->TrySendPacket(packet.get(), cluster_info)) {
+      const bool can_send_padding =
+          (rtp_module->RtxSendStatus() & kRtxRedundantPayloads) &&
+          rtp_module->HasBweExtensions();
+      if (can_send_padding) {
+        // This is now the last module to send media, and has the desired
+        // properties needed for payload based padding. Cache it for later use.
+        last_send_module_ = rtp_module;
+      }
+      return;
+    }
+  }
+
+  RTC_LOG(LS_WARNING) << "Failed to send packet, matching RTP module not found "
+                         "or transport error. SSRC = "
+                      << packet->Ssrc() << ", sequence number "
+                      << packet->SequenceNumber();
+}
+
 size_t PacketRouter::TimeToSendPadding(size_t bytes_to_send,
                                        const PacedPacketInfo& pacing_info) {
   size_t total_bytes_sent = 0;
diff --git a/modules/pacing/packet_router.h b/modules/pacing/packet_router.h
index 4ff5a0b..9a51899 100644
--- a/modules/pacing/packet_router.h
+++ b/modules/pacing/packet_router.h
@@ -14,11 +14,13 @@
 #include <stddef.h>
 #include <stdint.h>
 #include <list>
+#include <memory>
 #include <vector>
 
 #include "api/transport/network_types.h"
 #include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
 #include "rtc_base/constructor_magic.h"
 #include "rtc_base/critical_section.h"
 #include "rtc_base/thread_annotations.h"
@@ -30,7 +32,7 @@
 class TransportFeedback;
 }  // namespace rtcp
 
-// PacketRouter keeps track of RTP send modules to support the pacer.
+// PacketRouter keeps track of rtp send modules to support the pacer.
 // In addition, it handles feedback messages, which are sent on a send
 // module if possible (sender report), otherwise on receive module
 // (receiver report). For the latter case, we also keep track of the
@@ -56,6 +58,9 @@
       bool retransmission,
       const PacedPacketInfo& packet_info);
 
+  virtual void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
+                          const PacedPacketInfo& cluster_info);
+
   virtual size_t TimeToSendPadding(size_t bytes,
                                    const PacedPacketInfo& packet_info);
 
diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc
index a87c47c..9f52a6a 100644
--- a/modules/pacing/round_robin_packet_queue.cc
+++ b/modules/pacing/round_robin_packet_queue.cc
@@ -18,36 +18,53 @@
 
 namespace webrtc {
 
-RoundRobinPacketQueue::Packet::Packet(RtpPacketSender::Priority priority,
-                                      uint32_t ssrc,
-                                      uint16_t seq_number,
-                                      int64_t capture_time_ms,
-                                      int64_t enqueue_time_ms,
-                                      size_t length_in_bytes,
-                                      bool retransmission,
-                                      uint64_t enqueue_order)
-    : priority(priority),
-      ssrc(ssrc),
-      sequence_number(seq_number),
-      capture_time_ms(capture_time_ms),
-      enqueue_time_ms(enqueue_time_ms),
-      sum_paused_ms(0),
-      bytes(length_in_bytes),
-      retransmission(retransmission),
-      enqueue_order(enqueue_order) {}
+RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) =
+    default;
+RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
 
-RoundRobinPacketQueue::Packet::Packet(const Packet& other) = default;
+RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
+    int priority,
+    RtpPacketToSend::Type type,
+    uint32_t ssrc,
+    uint16_t seq_number,
+    int64_t capture_time_ms,
+    int64_t enqueue_time_ms,
+    size_t length_in_bytes,
+    bool retransmission,
+    uint64_t enqueue_order,
+    std::multiset<int64_t>::iterator enqueue_time_it,
+    absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
+        packet_it)
+    : type_(type),
+      priority_(priority),
+      ssrc_(ssrc),
+      sequence_number_(seq_number),
+      capture_time_ms_(capture_time_ms),
+      enqueue_time_ms_(enqueue_time_ms),
+      bytes_(length_in_bytes),
+      retransmission_(retransmission),
+      enqueue_order_(enqueue_order),
+      enqueue_time_it_(enqueue_time_it),
+      packet_it_(packet_it) {}
 
-RoundRobinPacketQueue::Packet::~Packet() {}
+std::unique_ptr<RtpPacketToSend>
+RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
+  return packet_it_ ? std::move(**packet_it_) : nullptr;
+}
 
-bool RoundRobinPacketQueue::Packet::operator<(
-    const RoundRobinPacketQueue::Packet& other) const {
-  if (priority != other.priority)
-    return priority > other.priority;
-  if (retransmission != other.retransmission)
-    return other.retransmission;
+void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTimeMs(
+    int64_t pause_time_sum_ms) {
+  enqueue_time_ms_ -= pause_time_sum_ms;
+}
 
-  return enqueue_order > other.enqueue_order;
+bool RoundRobinPacketQueue::QueuedPacket::operator<(
+    const RoundRobinPacketQueue::QueuedPacket& other) const {
+  if (priority_ != other.priority_)
+    return priority_ > other.priority_;
+  if (retransmission_ != other.retransmission_)
+    return other.retransmission_;
+
+  return enqueue_order_ > other.enqueue_order_;
 }
 
 RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {}
@@ -59,50 +76,41 @@
 
 RoundRobinPacketQueue::~RoundRobinPacketQueue() {}
 
-void RoundRobinPacketQueue::Push(const Packet& packet_to_insert) {
-  Packet packet(packet_to_insert);
-
-  auto stream_info_it = streams_.find(packet.ssrc);
-  if (stream_info_it == streams_.end()) {
-    stream_info_it = streams_.emplace(packet.ssrc, Stream()).first;
-    stream_info_it->second.priority_it = stream_priorities_.end();
-    stream_info_it->second.ssrc = packet.ssrc;
-  }
-
-  Stream* stream = &stream_info_it->second;
-
-  if (stream->priority_it == stream_priorities_.end()) {
-    // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
-    RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
-    stream->priority_it = stream_priorities_.emplace(
-        StreamPrioKey(packet.priority, stream->bytes), packet.ssrc);
-  } else if (packet.priority < stream->priority_it->first.priority) {
-    // If the priority of this SSRC increased, remove the outdated StreamPrioKey
-    // and insert a new one with the new priority. Note that
-    // RtpPacketSender::Priority uses lower ordinal for higher priority.
-    stream_priorities_.erase(stream->priority_it);
-    stream->priority_it = stream_priorities_.emplace(
-        StreamPrioKey(packet.priority, stream->bytes), packet.ssrc);
-  }
-  RTC_CHECK(stream->priority_it != stream_priorities_.end());
-
-  packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms);
-
-  // In order to figure out how much time a packet has spent in the queue while
-  // not in a paused state, we subtract the total amount of time the queue has
-  // been paused so far, and when the packet is poped we subtract the total
-  // amount of time the queue has been paused at that moment. This way we
-  // subtract the total amount of time the packet has spent in the queue while
-  // in a paused state.
-  UpdateQueueTime(packet.enqueue_time_ms);
-  packet.enqueue_time_ms -= pause_time_sum_ms_;
-  stream->packet_queue.push(packet);
-
-  size_packets_ += 1;
-  size_bytes_ += packet.bytes;
+void RoundRobinPacketQueue::Push(int priority,
+                                 RtpPacketToSend::Type type,
+                                 uint32_t ssrc,
+                                 uint16_t seq_number,
+                                 int64_t capture_time_ms,
+                                 int64_t enqueue_time_ms,
+                                 size_t length_in_bytes,
+                                 bool retransmission,
+                                 uint64_t enqueue_order) {
+  Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms,
+                    enqueue_time_ms, length_in_bytes, retransmission,
+                    enqueue_order, enqueue_times_.insert(enqueue_time_ms),
+                    absl::nullopt));
 }
 
-const RoundRobinPacketQueue::Packet& RoundRobinPacketQueue::BeginPop() {
+void RoundRobinPacketQueue::Push(int priority,
+                                 int64_t enqueue_time_ms,
+                                 uint64_t enqueue_order,
+                                 std::unique_ptr<RtpPacketToSend> packet) {
+  uint32_t ssrc = packet->Ssrc();
+  uint16_t sequence_number = packet->SequenceNumber();
+  int64_t capture_time_ms = packet->capture_time_ms();
+  size_t size_bytes = packet->payload_size();
+  auto type = packet->packet_type();
+  RTC_DCHECK(type.has_value());
+
+  rtp_packets_.push_front(std::move(packet));
+  Push(QueuedPacket(priority, *type, ssrc, sequence_number, capture_time_ms,
+                    enqueue_time_ms, size_bytes,
+                    *type == RtpPacketToSend::Type::kRetransmission,
+                    enqueue_order, enqueue_times_.insert(enqueue_time_ms),
+                    rtp_packets_.begin()));
+}
+
+RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() {
   RTC_CHECK(!pop_packet_ && !pop_stream_);
 
   Stream* stream = GetHighestPriorityStream();
@@ -110,22 +118,22 @@
   pop_packet_.emplace(stream->packet_queue.top());
   stream->packet_queue.pop();
 
-  return *pop_packet_;
+  return &pop_packet_.value();
 }
 
-void RoundRobinPacketQueue::CancelPop(const Packet& packet) {
+void RoundRobinPacketQueue::CancelPop() {
   RTC_CHECK(pop_packet_ && pop_stream_);
   (*pop_stream_)->packet_queue.push(*pop_packet_);
   pop_packet_.reset();
   pop_stream_.reset();
 }
 
-void RoundRobinPacketQueue::FinalizePop(const Packet& packet) {
+void RoundRobinPacketQueue::FinalizePop() {
   if (!Empty()) {
     RTC_CHECK(pop_packet_ && pop_stream_);
     Stream* stream = *pop_stream_;
     stream_priorities_.erase(stream->priority_it);
-    const Packet& packet = *pop_packet_;
+    const QueuedPacket& packet = *pop_packet_;
 
     // Calculate the total amount of time spent by this packet in the queue
     // while in a non-paused state. Note that the |pause_time_sum_ms_| was
@@ -133,11 +141,16 @@
     // by subtracting it now we effectively remove the time spent in in the
     // queue while in a paused state.
     int64_t time_in_non_paused_state_ms =
-        time_last_updated_ms_ - packet.enqueue_time_ms - pause_time_sum_ms_;
+        time_last_updated_ms_ - packet.enqueue_time_ms() - pause_time_sum_ms_;
     queue_time_sum_ms_ -= time_in_non_paused_state_ms;
 
-    RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end());
-    enqueue_times_.erase(packet.enqueue_time_it);
+    RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end());
+    enqueue_times_.erase(packet.EnqueueTimeIterator());
+
+    auto packet_it = packet.PacketIterator();
+    if (packet_it) {
+      rtp_packets_.erase(*packet_it);
+    }
 
     // Update |bytes| of this stream. The general idea is that the stream that
     // has sent the least amount of bytes should have the highest priority.
@@ -145,11 +158,11 @@
     // case a "budget" will be built up for the stream sending at the lower
     // rate. To avoid building a too large budget we limit |bytes| to be within
     // kMaxLeading bytes of the stream that has sent the most amount of bytes.
-    stream->bytes =
-        std::max(stream->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes);
+    stream->bytes = std::max(stream->bytes + packet.size_in_bytes(),
+                             max_bytes_ - kMaxLeadingBytes);
     max_bytes_ = std::max(max_bytes_, stream->bytes);
 
-    size_bytes_ -= packet.bytes;
+    size_bytes_ -= packet.size_in_bytes();
     size_packets_ -= 1;
     RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0);
 
@@ -158,7 +171,7 @@
     if (stream->packet_queue.empty()) {
       stream->priority_it = stream_priorities_.end();
     } else {
-      RtpPacketSender::Priority priority = stream->packet_queue.top().priority;
+      int priority = stream->packet_queue.top().priority();
       stream->priority_it = stream_priorities_.emplace(
           StreamPrioKey(priority, stream->bytes), stream->ssrc);
     }
@@ -218,6 +231,46 @@
   return queue_time_sum_ms_ / size_packets_;
 }
 
+void RoundRobinPacketQueue::Push(QueuedPacket packet) {
+  auto stream_info_it = streams_.find(packet.ssrc());
+  if (stream_info_it == streams_.end()) {
+    stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
+    stream_info_it->second.priority_it = stream_priorities_.end();
+    stream_info_it->second.ssrc = packet.ssrc();
+  }
+
+  Stream* stream = &stream_info_it->second;
+
+  if (stream->priority_it == stream_priorities_.end()) {
+    // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
+    RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
+    stream->priority_it = stream_priorities_.emplace(
+        StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc());
+  } else if (packet.priority() < stream->priority_it->first.priority) {
+    // If the priority of this SSRC increased, remove the outdated StreamPrioKey
+    // and insert a new one with the new priority. Note that |priority_| uses
+    // lower ordinal for higher priority.
+    stream_priorities_.erase(stream->priority_it);
+    stream->priority_it = stream_priorities_.emplace(
+        StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc());
+  }
+  RTC_CHECK(stream->priority_it != stream_priorities_.end());
+
+  // In order to figure out how much time a packet has spent in the queue while
+  // not in a paused state, we subtract the total amount of time the queue has
+  // been paused so far, and when the packet is popped we subtract the total
+  // amount of time the queue has been paused at that moment. This way we
+  // subtract the total amount of time the packet has spent in the queue while
+  // in a paused state.
+  UpdateQueueTime(packet.enqueue_time_ms());
+  packet.SubtractPauseTimeMs(pause_time_sum_ms_);
+
+  size_packets_ += 1;
+  size_bytes_ += packet.size_in_bytes();
+
+  stream->packet_queue.push(packet);
+}
+
 RoundRobinPacketQueue::Stream*
 RoundRobinPacketQueue::GetHighestPriorityStream() {
   RTC_CHECK(!stream_priorities_.empty());
diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h
index 74b855a..812ae87 100644
--- a/modules/pacing/round_robin_packet_queue.h
+++ b/modules/pacing/round_robin_packet_queue.h
@@ -15,11 +15,13 @@
 #include <stdint.h>
 #include <list>
 #include <map>
+#include <memory>
 #include <queue>
 #include <set>
 
 #include "absl/types/optional.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
 #include "system_wrappers/include/clock.h"
 
 namespace webrtc {
@@ -29,36 +31,80 @@
   explicit RoundRobinPacketQueue(int64_t start_time_us);
   ~RoundRobinPacketQueue();
 
-  struct Packet {
-    Packet(RtpPacketSender::Priority priority,
-           uint32_t ssrc,
-           uint16_t seq_number,
-           int64_t capture_time_ms,
-           int64_t enqueue_time_ms,
-           size_t length_in_bytes,
-           bool retransmission,
-           uint64_t enqueue_order);
-    Packet(const Packet& other);
-    virtual ~Packet();
-    bool operator<(const Packet& other) const;
+  struct QueuedPacket {
+   public:
+    QueuedPacket(
+        int priority,
+        RtpPacketToSend::Type type,
+        uint32_t ssrc,
+        uint16_t seq_number,
+        int64_t capture_time_ms,
+        int64_t enqueue_time_ms,
+        size_t length_in_bytes,
+        bool retransmission,
+        uint64_t enqueue_order,
+        std::multiset<int64_t>::iterator enqueue_time_it,
+        absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
+            packet_it);
+    QueuedPacket(const QueuedPacket& rhs);
+    ~QueuedPacket();
 
-    RtpPacketSender::Priority priority;
-    uint32_t ssrc;
-    uint16_t sequence_number;
-    int64_t capture_time_ms;  // Absolute time of frame capture.
-    int64_t enqueue_time_ms;  // Absolute time of pacer queue entry.
-    int64_t sum_paused_ms;
-    size_t bytes;
-    bool retransmission;
-    uint64_t enqueue_order;
-    std::list<Packet>::iterator this_it;
-    std::multiset<int64_t>::iterator enqueue_time_it;
+    bool operator<(const QueuedPacket& other) const;
+
+    int priority() const { return priority_; }
+    RtpPacketToSend::Type type() const { return type_; }
+    uint32_t ssrc() const { return ssrc_; }
+    uint16_t sequence_number() const { return sequence_number_; }
+    int64_t capture_time_ms() const { return capture_time_ms_; }
+    int64_t enqueue_time_ms() const { return enqueue_time_ms_; }
+    size_t size_in_bytes() const { return bytes_; }
+    bool is_retransmission() const { return retransmission_; }
+    uint64_t enqueue_order() const { return enqueue_order_; }
+    std::unique_ptr<RtpPacketToSend> ReleasePacket();
+
+    // For internal use.
+    absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
+    PacketIterator() const {
+      return packet_it_;
+    }
+    std::multiset<int64_t>::iterator EnqueueTimeIterator() const {
+      return enqueue_time_it_;
+    }
+    void SubtractPauseTimeMs(int64_t pause_time_sum_ms);
+
+   private:
+    RtpPacketToSend::Type type_;
+    int priority_;
+    uint32_t ssrc_;
+    uint16_t sequence_number_;
+    int64_t capture_time_ms_;  // Absolute time of frame capture.
+    int64_t enqueue_time_ms_;  // Absolute time of pacer queue entry.
+    size_t bytes_;
+    bool retransmission_;
+    uint64_t enqueue_order_;
+    std::multiset<int64_t>::iterator enqueue_time_it_;
+    // Iterator into |rtp_packets_| where the memory for RtpPacket is owned,
+    // if applicable.
+    absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
+        packet_it_;
   };
 
-  void Push(const Packet& packet);
-  const Packet& BeginPop();
-  void CancelPop(const Packet& packet);
-  void FinalizePop(const Packet& packet);
+  void Push(int priority,
+            RtpPacketToSend::Type type,
+            uint32_t ssrc,
+            uint16_t seq_number,
+            int64_t capture_time_ms,
+            int64_t enqueue_time_ms,
+            size_t length_in_bytes,
+            bool retransmission,
+            uint64_t enqueue_order);
+  void Push(int priority,
+            int64_t enqueue_time_ms,
+            uint64_t enqueue_order,
+            std::unique_ptr<RtpPacketToSend> packet);
+  QueuedPacket* BeginPop();
+  void CancelPop();
+  void FinalizePop();
 
   bool Empty() const;
   size_t SizeInPackets() const;
@@ -71,7 +117,7 @@
 
  private:
   struct StreamPrioKey {
-    StreamPrioKey(RtpPacketSender::Priority priority, int64_t bytes)
+    StreamPrioKey(int priority, int64_t bytes)
         : priority(priority), bytes(bytes) {}
 
     bool operator<(const StreamPrioKey& other) const {
@@ -80,7 +126,7 @@
       return bytes < other.bytes;
     }
 
-    const RtpPacketSender::Priority priority;
+    const int priority;
     const size_t bytes;
   };
 
@@ -92,7 +138,7 @@
 
     size_t bytes;
     uint32_t ssrc;
-    std::priority_queue<Packet> packet_queue;
+    std::priority_queue<QueuedPacket> packet_queue;
 
     // Whenever a packet is inserted for this stream we check if |priority_it|
     // points to an element in |stream_priorities_|, and if it does it means
@@ -104,13 +150,15 @@
 
   static constexpr size_t kMaxLeadingBytes = 1400;
 
+  void Push(QueuedPacket packet);
+
   Stream* GetHighestPriorityStream();
 
   // Just used to verify correctness.
   bool IsSsrcScheduled(uint32_t ssrc) const;
 
   int64_t time_last_updated_ms_;
-  absl::optional<Packet> pop_packet_;
+  absl::optional<QueuedPacket> pop_packet_;
   absl::optional<Stream*> pop_stream_;
 
   bool paused_ = false;
@@ -132,6 +180,12 @@
   // The enqueue time of every packet currently in the queue. Used to figure out
   // the age of the oldest packet in the queue.
   std::multiset<int64_t> enqueue_times_;
+
+  // List of RTP packets to be sent, not necessarily in the order they will be
+  // sent. PacketInfo.packet_it will point to an entry in this list, or the
+  // end iterator of this list if queue does not have direct ownership of the
+  // packet.
+  std::list<std::unique_ptr<RtpPacketToSend>> rtp_packets_;
 };
 }  // namespace webrtc
 
diff --git a/modules/rtp_rtcp/include/rtp_packet_pacer.h b/modules/rtp_rtcp/include/rtp_packet_pacer.h
index 9820fc2..180ddf7 100644
--- a/modules/rtp_rtcp/include/rtp_packet_pacer.h
+++ b/modules/rtp_rtcp/include/rtp_packet_pacer.h
@@ -23,10 +23,10 @@
 // TODO(bugs.webrtc.org/10633): Add things missing to this interface so that we
 // can use multiple different pacer implementations, and stop inheriting from
 // RtpPacketSender.
-class RtpPacketPacer : RtpPacketSender {
+class RtpPacketPacer : public RtpPacketSender {
  public:
   RtpPacketPacer() = default;
-  ~RtpPacketPacer() override;
+  ~RtpPacketPacer() override = default;
 
   // Insert packet into queue, for eventual transmission. Based on the type of
   // the packet, it will prioritized and scheduled relative to other packets and
diff --git a/modules/rtp_rtcp/include/rtp_rtcp.h b/modules/rtp_rtcp/include/rtp_rtcp.h
index 6734e6c..dbe6345 100644
--- a/modules/rtp_rtcp/include/rtp_rtcp.h
+++ b/modules/rtp_rtcp/include/rtp_rtcp.h
@@ -26,6 +26,7 @@
 #include "modules/rtp_rtcp/include/receive_statistics.h"
 #include "modules/rtp_rtcp/include/report_block_data.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
 #include "modules/rtp_rtcp/source/rtp_sender.h"
 #include "rtc_base/constructor_magic.h"
 #include "rtc_base/deprecation.h"