Move PacketQueue out of paced_sender.cc to its own packet_queue.{cc,h}.

Bug: webrtc:8287, webrtc:8288
Change-Id: If8937458c5b8f5a75b3de441aa409ae873f4bda2
Reviewed-on: https://webrtc-review.googlesource.com/3761
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#20003}
diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index 4882d4f..f72d3f9 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -19,6 +19,8 @@
     "paced_sender.cc",
     "paced_sender.h",
     "pacer.h",
+    "packet_queue.cc",
+    "packet_queue.h",
     "packet_router.cc",
     "packet_router.h",
   ]
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index 785a2de..96355b3 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -37,201 +37,7 @@
 
 }  // namespace
 
-// TODO(sprang): Move at least PacketQueue out to separate files, so that we can
-// more easily test them.
-
 namespace webrtc {
-namespace paced_sender {
-struct Packet {
-  Packet(RtpPacketSender::Priority priority,
-         uint32_t ssrc,
-         uint16_t seq_number,
-         int64_t capture_time_ms,
-         int64_t enqueue_time_ms,
-         size_t length_in_bytes,
-         bool retransmission,
-         uint64_t enqueue_order)
-      : priority(priority),
-        ssrc(ssrc),
-        sequence_number(seq_number),
-        capture_time_ms(capture_time_ms),
-        enqueue_time_ms(enqueue_time_ms),
-        sum_paused_ms(0),
-        bytes(length_in_bytes),
-        retransmission(retransmission),
-        enqueue_order(enqueue_order) {}
-
-  RtpPacketSender::Priority priority;
-  uint32_t ssrc;
-  uint16_t sequence_number;
-  int64_t capture_time_ms;  // Absolute time of frame capture.
-  int64_t enqueue_time_ms;  // Absolute time of pacer queue entry.
-  int64_t sum_paused_ms;    // Sum of time spent in queue while pacer is paused.
-  size_t bytes;
-  bool retransmission;
-  uint64_t enqueue_order;
-  std::list<Packet>::iterator this_it;
-};
-
-// Used by priority queue to sort packets.
-struct Comparator {
-  bool operator()(const Packet* first, const Packet* second) {
-    // Highest prio = 0.
-    if (first->priority != second->priority)
-      return first->priority > second->priority;
-
-    // Retransmissions go first.
-    if (second->retransmission != first->retransmission)
-      return second->retransmission;
-
-    // Older frames have higher prio.
-    if (first->capture_time_ms != second->capture_time_ms)
-      return first->capture_time_ms > second->capture_time_ms;
-
-    return first->enqueue_order > second->enqueue_order;
-  }
-};
-
-// Class encapsulating a priority queue with some extensions.
-class PacketQueue {
- public:
-  explicit PacketQueue(const Clock* clock)
-      : bytes_(0),
-        clock_(clock),
-        queue_time_sum_(0),
-        time_last_updated_(clock_->TimeInMilliseconds()),
-        paused_(false) {}
-  virtual ~PacketQueue() {}
-
-  void Push(const Packet& packet) {
-    if (!AddToDupeSet(packet))
-      return;
-
-    UpdateQueueTime(packet.enqueue_time_ms);
-
-    // Store packet in list, use pointers in priority queue for cheaper moves.
-    // Packets have a handle to its own iterator in the list, for easy removal
-    // when popping from queue.
-    packet_list_.push_front(packet);
-    std::list<Packet>::iterator it = packet_list_.begin();
-    it->this_it = it;          // Handle for direct removal from list.
-    prio_queue_.push(&(*it));  // Pointer into list.
-    bytes_ += packet.bytes;
-  }
-
-  const Packet& BeginPop() {
-    const Packet& packet = *prio_queue_.top();
-    prio_queue_.pop();
-    return packet;
-  }
-
-  void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); }
-
-  void FinalizePop(const Packet& packet) {
-    RemoveFromDupeSet(packet);
-    bytes_ -= packet.bytes;
-    int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
-    RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);
-    packet_queue_time_ms -= packet.sum_paused_ms;
-    RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_);
-    queue_time_sum_ -= packet_queue_time_ms;
-    packet_list_.erase(packet.this_it);
-    RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
-    if (packet_list_.empty())
-      RTC_DCHECK_EQ(0, queue_time_sum_);
-  }
-
-  bool Empty() const { return prio_queue_.empty(); }
-
-  size_t SizeInPackets() const { return prio_queue_.size(); }
-
-  uint64_t SizeInBytes() const { return bytes_; }
-
-  int64_t OldestEnqueueTimeMs() const {
-    auto it = packet_list_.rbegin();
-    if (it == packet_list_.rend())
-      return 0;
-    return it->enqueue_time_ms;
-  }
-
-  void UpdateQueueTime(int64_t timestamp_ms) {
-    RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
-    if (timestamp_ms == time_last_updated_)
-      return;
-
-    int64_t delta_ms = timestamp_ms - time_last_updated_;
-
-    if (paused_) {
-      // Increase per-packet accumulators of time spent in queue while paused,
-      // so that we can disregard that when subtracting main accumulator when
-      // popping packet from the queue.
-      for (auto& it : packet_list_) {
-        it.sum_paused_ms += delta_ms;
-      }
-    } else {
-      // Use packet packet_list_.size() not prio_queue_.size() here, as there
-      // might be an outstanding element popped from prio_queue_ currently in
-      // the SendPacket() call, while packet_list_ will always be correct.
-      queue_time_sum_ += delta_ms * packet_list_.size();
-    }
-    time_last_updated_ = timestamp_ms;
-  }
-
-  void SetPauseState(bool paused, int64_t timestamp_ms) {
-    if (paused_ == paused)
-      return;
-    UpdateQueueTime(timestamp_ms);
-    paused_ = paused;
-  }
-
-  int64_t AverageQueueTimeMs() const {
-    if (prio_queue_.empty())
-      return 0;
-    return queue_time_sum_ / packet_list_.size();
-  }
-
- private:
-  // Try to add a packet to the set of ssrc/seqno identifiers currently in the
-  // queue. Return true if inserted, false if this is a duplicate.
-  bool AddToDupeSet(const Packet& packet) {
-    SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
-    if (it == dupe_map_.end()) {
-      // First for this ssrc, just insert.
-      dupe_map_[packet.ssrc].insert(packet.sequence_number);
-      return true;
-    }
-
-    // Insert returns a pair, where second is a bool set to true if new element.
-    return it->second.insert(packet.sequence_number).second;
-  }
-
-  void RemoveFromDupeSet(const Packet& packet) {
-    SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
-    RTC_DCHECK(it != dupe_map_.end());
-    it->second.erase(packet.sequence_number);
-    if (it->second.empty()) {
-      dupe_map_.erase(it);
-    }
-  }
-
-  // List of packets, in the order the were enqueued. Since dequeueing may
-  // occur out of order, use list instead of vector.
-  std::list<Packet> packet_list_;
-  // Priority queue of the packets, sorted according to Comparator.
-  // Use pointers into list, to avoid moving whole struct within heap.
-  std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
-  // Total number of bytes in the queue.
-  uint64_t bytes_;
-  // Map<ssrc, std::set<seq_no> >, for checking duplicates.
-  typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
-  SsrcSeqNoMap dupe_map_;
-  const Clock* const clock_;
-  int64_t queue_time_sum_;
-  int64_t time_last_updated_;
-  bool paused_;
-};
-
-}  // namespace paced_sender
 
 const int64_t PacedSender::kMaxQueueLengthMs = 2000;
 const float PacedSender::kDefaultPaceMultiplier = 2.5f;
@@ -253,7 +59,7 @@
       pacing_bitrate_kbps_(0),
       time_last_update_us_(clock->TimeInMicroseconds()),
       first_sent_packet_ms_(-1),
-      packets_(new paced_sender::PacketQueue(clock)),
+      packets_(new PacketQueue(clock)),
       packet_counter_(0),
       pacing_factor_(kDefaultPaceMultiplier),
       queue_time_limit(kMaxQueueLengthMs) {
@@ -342,9 +148,9 @@
   if (capture_time_ms < 0)
     capture_time_ms = now_ms;
 
-  packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number,
-                                      capture_time_ms, now_ms, bytes,
-                                      retransmission, packet_counter_++));
+  packets_->Push(PacketQueue::Packet(priority, ssrc, sequence_number,
+                                     capture_time_ms, now_ms, bytes,
+                                     retransmission, packet_counter_++));
 }
 
 int64_t PacedSender::ExpectedQueueTimeMs() const {
@@ -455,7 +261,7 @@
     // Since we need to release the lock in order to send, we first pop the
     // element from the priority queue but keep it in storage, so that we can
     // reinsert it if send fails.
-    const paced_sender::Packet& packet = packets_->BeginPop();
+    const PacketQueue::Packet& packet = packets_->BeginPop();
 
     if (SendPacket(packet, pacing_info)) {
       // Send succeeded, remove it from the queue.
@@ -496,7 +302,7 @@
   process_thread_ = process_thread;
 }
 
-bool PacedSender::SendPacket(const paced_sender::Packet& packet,
+bool PacedSender::SendPacket(const PacketQueue::Packet& packet,
                              const PacedPacketInfo& pacing_info) {
   RTC_DCHECK(!paused_);
   if (media_budget_->bytes_remaining() == 0 &&
diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h
index 754d750..499f1d9 100644
--- a/modules/pacing/paced_sender.h
+++ b/modules/pacing/paced_sender.h
@@ -11,12 +11,11 @@
 #ifndef MODULES_PACING_PACED_SENDER_H_
 #define MODULES_PACING_PACED_SENDER_H_
 
-#include <list>
 #include <memory>
-#include <set>
 
 #include "api/optional.h"
 #include "modules/pacing/pacer.h"
+#include "modules/pacing/packet_queue.h"
 #include "rtc_base/criticalsection.h"
 #include "rtc_base/thread_annotations.h"
 #include "typedefs.h"  // NOLINT(build/include)
@@ -29,12 +28,6 @@
 class RtcEventLog;
 class IntervalBudget;
 
-namespace paced_sender {
-class IntervalBudget;
-struct Packet;
-class PacketQueue;
-}  // namespace paced_sender
-
 class PacedSender : public Pacer {
  public:
   class PacketSender {
@@ -159,7 +152,7 @@
   void UpdateBudgetWithBytesSent(size_t bytes)
       RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
 
-  bool SendPacket(const paced_sender::Packet& packet,
+  bool SendPacket(const PacketQueue::Packet& packet,
                   const PacedPacketInfo& cluster_info)
       RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
   size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info)
@@ -191,7 +184,7 @@
   int64_t time_last_update_us_ RTC_GUARDED_BY(critsect_);
   int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_);
 
-  std::unique_ptr<paced_sender::PacketQueue> packets_ RTC_GUARDED_BY(critsect_);
+  std::unique_ptr<PacketQueue> packets_ RTC_GUARDED_BY(critsect_);
   uint64_t packet_counter_;
   ProcessThread* process_thread_ = nullptr;
 
diff --git a/modules/pacing/packet_queue.cc b/modules/pacing/packet_queue.cc
new file mode 100644
index 0000000..765e43f
--- /dev/null
+++ b/modules/pacing/packet_queue.cc
@@ -0,0 +1,174 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "modules/pacing/packet_queue.h"
+
+#include <algorithm>
+#include <list>
+#include <vector>
+
+#include "modules/include/module_common_types.h"
+#include "modules/pacing/alr_detector.h"
+#include "modules/pacing/bitrate_prober.h"
+#include "modules/pacing/interval_budget.h"
+#include "modules/utility/include/process_thread.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
+#include "system_wrappers/include/clock.h"
+#include "system_wrappers/include/field_trial.h"
+
+namespace webrtc {
+
+PacketQueue::Packet::Packet(RtpPacketSender::Priority priority,
+                            uint32_t ssrc,
+                            uint16_t seq_number,
+                            int64_t capture_time_ms,
+                            int64_t enqueue_time_ms,
+                            size_t length_in_bytes,
+                            bool retransmission,
+                            uint64_t enqueue_order)
+    : priority(priority),
+      ssrc(ssrc),
+      sequence_number(seq_number),
+      capture_time_ms(capture_time_ms),
+      enqueue_time_ms(enqueue_time_ms),
+      sum_paused_ms(0),
+      bytes(length_in_bytes),
+      retransmission(retransmission),
+      enqueue_order(enqueue_order) {}
+
+PacketQueue::Packet::~Packet() {}
+
+PacketQueue::PacketQueue(const Clock* clock)
+    : bytes_(0),
+      clock_(clock),
+      queue_time_sum_(0),
+      time_last_updated_(clock_->TimeInMilliseconds()),
+      paused_(false) {}
+
+PacketQueue::~PacketQueue() {}
+
+void PacketQueue::Push(const Packet& packet) {
+  if (!AddToDupeSet(packet))
+    return;
+
+  UpdateQueueTime(packet.enqueue_time_ms);
+
+  // Store packet in list, use pointers in priority queue for cheaper moves.
+  // Packets have a handle to its own iterator in the list, for easy removal
+  // when popping from queue.
+  packet_list_.push_front(packet);
+  std::list<Packet>::iterator it = packet_list_.begin();
+  it->this_it = it;          // Handle for direct removal from list.
+  prio_queue_.push(&(*it));  // Pointer into list.
+  bytes_ += packet.bytes;
+}
+
+const PacketQueue::Packet& PacketQueue::BeginPop() {
+  const PacketQueue::Packet& packet = *prio_queue_.top();
+  prio_queue_.pop();
+  return packet;
+}
+
+void PacketQueue::CancelPop(const PacketQueue::Packet& packet) {
+  prio_queue_.push(&(*packet.this_it));
+}
+
+void PacketQueue::FinalizePop(const PacketQueue::Packet& packet) {
+  RemoveFromDupeSet(packet);
+  bytes_ -= packet.bytes;
+  int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
+  RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);
+  packet_queue_time_ms -= packet.sum_paused_ms;
+  RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_);
+  queue_time_sum_ -= packet_queue_time_ms;
+  packet_list_.erase(packet.this_it);
+  RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
+  if (packet_list_.empty())
+    RTC_DCHECK_EQ(0, queue_time_sum_);
+}
+
+bool PacketQueue::Empty() const {
+  return prio_queue_.empty();
+}
+
+size_t PacketQueue::SizeInPackets() const {
+  return prio_queue_.size();
+}
+
+uint64_t PacketQueue::SizeInBytes() const {
+  return bytes_;
+}
+
+int64_t PacketQueue::OldestEnqueueTimeMs() const {
+  auto it = packet_list_.rbegin();
+  if (it == packet_list_.rend())
+    return 0;
+  return it->enqueue_time_ms;
+}
+
+void PacketQueue::UpdateQueueTime(int64_t timestamp_ms) {
+  RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
+  if (timestamp_ms == time_last_updated_)
+    return;
+
+  int64_t delta_ms = timestamp_ms - time_last_updated_;
+
+  if (paused_) {
+    // Increase per-packet accumulators of time spent in queue while paused,
+    // so that we can disregard that when subtracting main accumulator when
+    // popping packet from the queue.
+    for (auto& it : packet_list_) {
+      it.sum_paused_ms += delta_ms;
+    }
+  } else {
+    // Use packet packet_list_.size() not prio_queue_.size() here, as there
+    // might be an outstanding element popped from prio_queue_ currently in
+    // the SendPacket() call, while packet_list_ will always be correct.
+    queue_time_sum_ += delta_ms * packet_list_.size();
+  }
+  time_last_updated_ = timestamp_ms;
+}
+
+void PacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) {
+  if (paused_ == paused)
+    return;
+  UpdateQueueTime(timestamp_ms);
+  paused_ = paused;
+}
+
+int64_t PacketQueue::AverageQueueTimeMs() const {
+  if (prio_queue_.empty())
+    return 0;
+  return queue_time_sum_ / packet_list_.size();
+}
+
+bool PacketQueue::AddToDupeSet(const PacketQueue::Packet& packet) {
+  SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+  if (it == dupe_map_.end()) {
+    // First for this ssrc, just insert.
+    dupe_map_[packet.ssrc].insert(packet.sequence_number);
+    return true;
+  }
+
+  // Insert returns a pair, where second is a bool set to true if new element.
+  return it->second.insert(packet.sequence_number).second;
+}
+
+void PacketQueue::RemoveFromDupeSet(const PacketQueue::Packet& packet) {
+  SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+  RTC_DCHECK(it != dupe_map_.end());
+  it->second.erase(packet.sequence_number);
+  if (it->second.empty()) {
+    dupe_map_.erase(it);
+  }
+}
+
+}  // namespace webrtc
diff --git a/modules/pacing/packet_queue.h b/modules/pacing/packet_queue.h
new file mode 100644
index 0000000..c6aa843
--- /dev/null
+++ b/modules/pacing/packet_queue.h
@@ -0,0 +1,109 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef MODULES_PACING_PACKET_QUEUE_H_
+#define MODULES_PACING_PACKET_QUEUE_H_
+
+#include <list>
+#include <map>
+#include <queue>
+#include <set>
+#include <vector>
+
+#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+
+namespace webrtc {
+
+class PacketQueue {
+ public:
+  explicit PacketQueue(const Clock* clock);
+  virtual ~PacketQueue();
+
+  struct Packet {
+    Packet(RtpPacketSender::Priority priority,
+           uint32_t ssrc,
+           uint16_t seq_number,
+           int64_t capture_time_ms,
+           int64_t enqueue_time_ms,
+           size_t length_in_bytes,
+           bool retransmission,
+           uint64_t enqueue_order);
+
+    virtual ~Packet();
+
+    RtpPacketSender::Priority priority;
+    uint32_t ssrc;
+    uint16_t sequence_number;
+    int64_t capture_time_ms;  // Absolute time of frame capture.
+    int64_t enqueue_time_ms;  // Absolute time of pacer queue entry.
+    int64_t sum_paused_ms;  // Sum of time spent in queue while pacer is paused.
+    size_t bytes;
+    bool retransmission;
+    uint64_t enqueue_order;
+    std::list<Packet>::iterator this_it;
+  };
+
+  void Push(const Packet& packet);
+  const Packet& BeginPop();
+  void CancelPop(const Packet& packet);
+  void FinalizePop(const Packet& packet);
+  bool Empty() const;
+  size_t SizeInPackets() const;
+  uint64_t SizeInBytes() const;
+  int64_t OldestEnqueueTimeMs() const;
+  void UpdateQueueTime(int64_t timestamp_ms);
+  void SetPauseState(bool paused, int64_t timestamp_ms);
+  int64_t AverageQueueTimeMs() const;
+
+ private:
+  // Try to add a packet to the set of ssrc/seqno identifiers currently in the
+  // queue. Return true if inserted, false if this is a duplicate.
+  bool AddToDupeSet(const Packet& packet);
+
+  void RemoveFromDupeSet(const Packet& packet);
+
+  // Used by priority queue to sort packets.
+  struct Comparator {
+    bool operator()(const Packet* first, const Packet* second) {
+      // Highest prio = 0.
+      if (first->priority != second->priority)
+        return first->priority > second->priority;
+
+      // Retransmissions go first.
+      if (second->retransmission != first->retransmission)
+        return second->retransmission;
+
+      // Older frames have higher prio.
+      if (first->capture_time_ms != second->capture_time_ms)
+        return first->capture_time_ms > second->capture_time_ms;
+
+      return first->enqueue_order > second->enqueue_order;
+    }
+  };
+
+  // List of packets, in the order the were enqueued. Since dequeueing may
+  // occur out of order, use list instead of vector.
+  std::list<Packet> packet_list_;
+  // Priority queue of the packets, sorted according to Comparator.
+  // Use pointers into list, to avodi moving whole struct within heap.
+  std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
+  // Total number of bytes in the queue.
+  uint64_t bytes_;
+  // Map<ssrc, std::set<seq_no> >, for checking duplicates.
+  typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
+  SsrcSeqNoMap dupe_map_;
+  const Clock* const clock_;
+  int64_t queue_time_sum_;
+  int64_t time_last_updated_;
+  bool paused_;
+};
+}  // namespace webrtc
+
+#endif  // MODULES_PACING_PACKET_QUEUE_H_