Move PacketQueue out of paced_sender.cc to its own packet_queue.{cc,h}.
Bug: webrtc:8287, webrtc:8288
Change-Id: If8937458c5b8f5a75b3de441aa409ae873f4bda2
Reviewed-on: https://webrtc-review.googlesource.com/3761
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#20003}diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index 4882d4f..f72d3f9 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -19,6 +19,8 @@
"paced_sender.cc",
"paced_sender.h",
"pacer.h",
+ "packet_queue.cc",
+ "packet_queue.h",
"packet_router.cc",
"packet_router.h",
]
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index 785a2de..96355b3 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -37,201 +37,7 @@
} // namespace
-// TODO(sprang): Move at least PacketQueue out to separate files, so that we can
-// more easily test them.
-
namespace webrtc {
-namespace paced_sender {
-struct Packet {
- Packet(RtpPacketSender::Priority priority,
- uint32_t ssrc,
- uint16_t seq_number,
- int64_t capture_time_ms,
- int64_t enqueue_time_ms,
- size_t length_in_bytes,
- bool retransmission,
- uint64_t enqueue_order)
- : priority(priority),
- ssrc(ssrc),
- sequence_number(seq_number),
- capture_time_ms(capture_time_ms),
- enqueue_time_ms(enqueue_time_ms),
- sum_paused_ms(0),
- bytes(length_in_bytes),
- retransmission(retransmission),
- enqueue_order(enqueue_order) {}
-
- RtpPacketSender::Priority priority;
- uint32_t ssrc;
- uint16_t sequence_number;
- int64_t capture_time_ms; // Absolute time of frame capture.
- int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
- int64_t sum_paused_ms; // Sum of time spent in queue while pacer is paused.
- size_t bytes;
- bool retransmission;
- uint64_t enqueue_order;
- std::list<Packet>::iterator this_it;
-};
-
-// Used by priority queue to sort packets.
-struct Comparator {
- bool operator()(const Packet* first, const Packet* second) {
- // Highest prio = 0.
- if (first->priority != second->priority)
- return first->priority > second->priority;
-
- // Retransmissions go first.
- if (second->retransmission != first->retransmission)
- return second->retransmission;
-
- // Older frames have higher prio.
- if (first->capture_time_ms != second->capture_time_ms)
- return first->capture_time_ms > second->capture_time_ms;
-
- return first->enqueue_order > second->enqueue_order;
- }
-};
-
-// Class encapsulating a priority queue with some extensions.
-class PacketQueue {
- public:
- explicit PacketQueue(const Clock* clock)
- : bytes_(0),
- clock_(clock),
- queue_time_sum_(0),
- time_last_updated_(clock_->TimeInMilliseconds()),
- paused_(false) {}
- virtual ~PacketQueue() {}
-
- void Push(const Packet& packet) {
- if (!AddToDupeSet(packet))
- return;
-
- UpdateQueueTime(packet.enqueue_time_ms);
-
- // Store packet in list, use pointers in priority queue for cheaper moves.
- // Packets have a handle to its own iterator in the list, for easy removal
- // when popping from queue.
- packet_list_.push_front(packet);
- std::list<Packet>::iterator it = packet_list_.begin();
- it->this_it = it; // Handle for direct removal from list.
- prio_queue_.push(&(*it)); // Pointer into list.
- bytes_ += packet.bytes;
- }
-
- const Packet& BeginPop() {
- const Packet& packet = *prio_queue_.top();
- prio_queue_.pop();
- return packet;
- }
-
- void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); }
-
- void FinalizePop(const Packet& packet) {
- RemoveFromDupeSet(packet);
- bytes_ -= packet.bytes;
- int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
- RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);
- packet_queue_time_ms -= packet.sum_paused_ms;
- RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_);
- queue_time_sum_ -= packet_queue_time_ms;
- packet_list_.erase(packet.this_it);
- RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
- if (packet_list_.empty())
- RTC_DCHECK_EQ(0, queue_time_sum_);
- }
-
- bool Empty() const { return prio_queue_.empty(); }
-
- size_t SizeInPackets() const { return prio_queue_.size(); }
-
- uint64_t SizeInBytes() const { return bytes_; }
-
- int64_t OldestEnqueueTimeMs() const {
- auto it = packet_list_.rbegin();
- if (it == packet_list_.rend())
- return 0;
- return it->enqueue_time_ms;
- }
-
- void UpdateQueueTime(int64_t timestamp_ms) {
- RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
- if (timestamp_ms == time_last_updated_)
- return;
-
- int64_t delta_ms = timestamp_ms - time_last_updated_;
-
- if (paused_) {
- // Increase per-packet accumulators of time spent in queue while paused,
- // so that we can disregard that when subtracting main accumulator when
- // popping packet from the queue.
- for (auto& it : packet_list_) {
- it.sum_paused_ms += delta_ms;
- }
- } else {
- // Use packet packet_list_.size() not prio_queue_.size() here, as there
- // might be an outstanding element popped from prio_queue_ currently in
- // the SendPacket() call, while packet_list_ will always be correct.
- queue_time_sum_ += delta_ms * packet_list_.size();
- }
- time_last_updated_ = timestamp_ms;
- }
-
- void SetPauseState(bool paused, int64_t timestamp_ms) {
- if (paused_ == paused)
- return;
- UpdateQueueTime(timestamp_ms);
- paused_ = paused;
- }
-
- int64_t AverageQueueTimeMs() const {
- if (prio_queue_.empty())
- return 0;
- return queue_time_sum_ / packet_list_.size();
- }
-
- private:
- // Try to add a packet to the set of ssrc/seqno identifiers currently in the
- // queue. Return true if inserted, false if this is a duplicate.
- bool AddToDupeSet(const Packet& packet) {
- SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
- if (it == dupe_map_.end()) {
- // First for this ssrc, just insert.
- dupe_map_[packet.ssrc].insert(packet.sequence_number);
- return true;
- }
-
- // Insert returns a pair, where second is a bool set to true if new element.
- return it->second.insert(packet.sequence_number).second;
- }
-
- void RemoveFromDupeSet(const Packet& packet) {
- SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
- RTC_DCHECK(it != dupe_map_.end());
- it->second.erase(packet.sequence_number);
- if (it->second.empty()) {
- dupe_map_.erase(it);
- }
- }
-
- // List of packets, in the order the were enqueued. Since dequeueing may
- // occur out of order, use list instead of vector.
- std::list<Packet> packet_list_;
- // Priority queue of the packets, sorted according to Comparator.
- // Use pointers into list, to avoid moving whole struct within heap.
- std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
- // Total number of bytes in the queue.
- uint64_t bytes_;
- // Map<ssrc, std::set<seq_no> >, for checking duplicates.
- typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
- SsrcSeqNoMap dupe_map_;
- const Clock* const clock_;
- int64_t queue_time_sum_;
- int64_t time_last_updated_;
- bool paused_;
-};
-
-} // namespace paced_sender
const int64_t PacedSender::kMaxQueueLengthMs = 2000;
const float PacedSender::kDefaultPaceMultiplier = 2.5f;
@@ -253,7 +59,7 @@
pacing_bitrate_kbps_(0),
time_last_update_us_(clock->TimeInMicroseconds()),
first_sent_packet_ms_(-1),
- packets_(new paced_sender::PacketQueue(clock)),
+ packets_(new PacketQueue(clock)),
packet_counter_(0),
pacing_factor_(kDefaultPaceMultiplier),
queue_time_limit(kMaxQueueLengthMs) {
@@ -342,9 +148,9 @@
if (capture_time_ms < 0)
capture_time_ms = now_ms;
- packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number,
- capture_time_ms, now_ms, bytes,
- retransmission, packet_counter_++));
+ packets_->Push(PacketQueue::Packet(priority, ssrc, sequence_number,
+ capture_time_ms, now_ms, bytes,
+ retransmission, packet_counter_++));
}
int64_t PacedSender::ExpectedQueueTimeMs() const {
@@ -455,7 +261,7 @@
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
- const paced_sender::Packet& packet = packets_->BeginPop();
+ const PacketQueue::Packet& packet = packets_->BeginPop();
if (SendPacket(packet, pacing_info)) {
// Send succeeded, remove it from the queue.
@@ -496,7 +302,7 @@
process_thread_ = process_thread;
}
-bool PacedSender::SendPacket(const paced_sender::Packet& packet,
+bool PacedSender::SendPacket(const PacketQueue::Packet& packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK(!paused_);
if (media_budget_->bytes_remaining() == 0 &&
diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h
index 754d750..499f1d9 100644
--- a/modules/pacing/paced_sender.h
+++ b/modules/pacing/paced_sender.h
@@ -11,12 +11,11 @@
#ifndef MODULES_PACING_PACED_SENDER_H_
#define MODULES_PACING_PACED_SENDER_H_
-#include <list>
#include <memory>
-#include <set>
#include "api/optional.h"
#include "modules/pacing/pacer.h"
+#include "modules/pacing/packet_queue.h"
#include "rtc_base/criticalsection.h"
#include "rtc_base/thread_annotations.h"
#include "typedefs.h" // NOLINT(build/include)
@@ -29,12 +28,6 @@
class RtcEventLog;
class IntervalBudget;
-namespace paced_sender {
-class IntervalBudget;
-struct Packet;
-class PacketQueue;
-} // namespace paced_sender
-
class PacedSender : public Pacer {
public:
class PacketSender {
@@ -159,7 +152,7 @@
void UpdateBudgetWithBytesSent(size_t bytes)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- bool SendPacket(const paced_sender::Packet& packet,
+ bool SendPacket(const PacketQueue::Packet& packet,
const PacedPacketInfo& cluster_info)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info)
@@ -191,7 +184,7 @@
int64_t time_last_update_us_ RTC_GUARDED_BY(critsect_);
int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_);
- std::unique_ptr<paced_sender::PacketQueue> packets_ RTC_GUARDED_BY(critsect_);
+ std::unique_ptr<PacketQueue> packets_ RTC_GUARDED_BY(critsect_);
uint64_t packet_counter_;
ProcessThread* process_thread_ = nullptr;
diff --git a/modules/pacing/packet_queue.cc b/modules/pacing/packet_queue.cc
new file mode 100644
index 0000000..765e43f
--- /dev/null
+++ b/modules/pacing/packet_queue.cc
@@ -0,0 +1,174 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "modules/pacing/packet_queue.h"
+
+#include <algorithm>
+#include <list>
+#include <vector>
+
+#include "modules/include/module_common_types.h"
+#include "modules/pacing/alr_detector.h"
+#include "modules/pacing/bitrate_prober.h"
+#include "modules/pacing/interval_budget.h"
+#include "modules/utility/include/process_thread.h"
+#include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
+#include "system_wrappers/include/clock.h"
+#include "system_wrappers/include/field_trial.h"
+
+namespace webrtc {
+
+PacketQueue::Packet::Packet(RtpPacketSender::Priority priority,
+ uint32_t ssrc,
+ uint16_t seq_number,
+ int64_t capture_time_ms,
+ int64_t enqueue_time_ms,
+ size_t length_in_bytes,
+ bool retransmission,
+ uint64_t enqueue_order)
+ : priority(priority),
+ ssrc(ssrc),
+ sequence_number(seq_number),
+ capture_time_ms(capture_time_ms),
+ enqueue_time_ms(enqueue_time_ms),
+ sum_paused_ms(0),
+ bytes(length_in_bytes),
+ retransmission(retransmission),
+ enqueue_order(enqueue_order) {}
+
+PacketQueue::Packet::~Packet() {}
+
+PacketQueue::PacketQueue(const Clock* clock)
+ : bytes_(0),
+ clock_(clock),
+ queue_time_sum_(0),
+ time_last_updated_(clock_->TimeInMilliseconds()),
+ paused_(false) {}
+
+PacketQueue::~PacketQueue() {}
+
+void PacketQueue::Push(const Packet& packet) {
+ if (!AddToDupeSet(packet))
+ return;
+
+ UpdateQueueTime(packet.enqueue_time_ms);
+
+ // Store packet in list, use pointers in priority queue for cheaper moves.
+ // Packets have a handle to its own iterator in the list, for easy removal
+ // when popping from queue.
+ packet_list_.push_front(packet);
+ std::list<Packet>::iterator it = packet_list_.begin();
+ it->this_it = it; // Handle for direct removal from list.
+ prio_queue_.push(&(*it)); // Pointer into list.
+ bytes_ += packet.bytes;
+}
+
+const PacketQueue::Packet& PacketQueue::BeginPop() {
+ const PacketQueue::Packet& packet = *prio_queue_.top();
+ prio_queue_.pop();
+ return packet;
+}
+
+void PacketQueue::CancelPop(const PacketQueue::Packet& packet) {
+ prio_queue_.push(&(*packet.this_it));
+}
+
+void PacketQueue::FinalizePop(const PacketQueue::Packet& packet) {
+ RemoveFromDupeSet(packet);
+ bytes_ -= packet.bytes;
+ int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
+ RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);
+ packet_queue_time_ms -= packet.sum_paused_ms;
+ RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_);
+ queue_time_sum_ -= packet_queue_time_ms;
+ packet_list_.erase(packet.this_it);
+ RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
+ if (packet_list_.empty())
+ RTC_DCHECK_EQ(0, queue_time_sum_);
+}
+
+bool PacketQueue::Empty() const {
+ return prio_queue_.empty();
+}
+
+size_t PacketQueue::SizeInPackets() const {
+ return prio_queue_.size();
+}
+
+uint64_t PacketQueue::SizeInBytes() const {
+ return bytes_;
+}
+
+int64_t PacketQueue::OldestEnqueueTimeMs() const {
+ auto it = packet_list_.rbegin();
+ if (it == packet_list_.rend())
+ return 0;
+ return it->enqueue_time_ms;
+}
+
+void PacketQueue::UpdateQueueTime(int64_t timestamp_ms) {
+ RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
+ if (timestamp_ms == time_last_updated_)
+ return;
+
+ int64_t delta_ms = timestamp_ms - time_last_updated_;
+
+ if (paused_) {
+ // Increase per-packet accumulators of time spent in queue while paused,
+ // so that we can disregard that when subtracting main accumulator when
+ // popping packet from the queue.
+ for (auto& it : packet_list_) {
+ it.sum_paused_ms += delta_ms;
+ }
+ } else {
+ // Use packet packet_list_.size() not prio_queue_.size() here, as there
+ // might be an outstanding element popped from prio_queue_ currently in
+ // the SendPacket() call, while packet_list_ will always be correct.
+ queue_time_sum_ += delta_ms * packet_list_.size();
+ }
+ time_last_updated_ = timestamp_ms;
+}
+
+void PacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) {
+ if (paused_ == paused)
+ return;
+ UpdateQueueTime(timestamp_ms);
+ paused_ = paused;
+}
+
+int64_t PacketQueue::AverageQueueTimeMs() const {
+ if (prio_queue_.empty())
+ return 0;
+ return queue_time_sum_ / packet_list_.size();
+}
+
+bool PacketQueue::AddToDupeSet(const PacketQueue::Packet& packet) {
+ SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+ if (it == dupe_map_.end()) {
+ // First for this ssrc, just insert.
+ dupe_map_[packet.ssrc].insert(packet.sequence_number);
+ return true;
+ }
+
+ // Insert returns a pair, where second is a bool set to true if new element.
+ return it->second.insert(packet.sequence_number).second;
+}
+
+void PacketQueue::RemoveFromDupeSet(const PacketQueue::Packet& packet) {
+ SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
+ RTC_DCHECK(it != dupe_map_.end());
+ it->second.erase(packet.sequence_number);
+ if (it->second.empty()) {
+ dupe_map_.erase(it);
+ }
+}
+
+} // namespace webrtc
diff --git a/modules/pacing/packet_queue.h b/modules/pacing/packet_queue.h
new file mode 100644
index 0000000..c6aa843
--- /dev/null
+++ b/modules/pacing/packet_queue.h
@@ -0,0 +1,109 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef MODULES_PACING_PACKET_QUEUE_H_
+#define MODULES_PACING_PACKET_QUEUE_H_
+
+#include <list>
+#include <map>
+#include <queue>
+#include <set>
+#include <vector>
+
+#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+
+namespace webrtc {
+
+class PacketQueue {
+ public:
+ explicit PacketQueue(const Clock* clock);
+ virtual ~PacketQueue();
+
+ struct Packet {
+ Packet(RtpPacketSender::Priority priority,
+ uint32_t ssrc,
+ uint16_t seq_number,
+ int64_t capture_time_ms,
+ int64_t enqueue_time_ms,
+ size_t length_in_bytes,
+ bool retransmission,
+ uint64_t enqueue_order);
+
+ virtual ~Packet();
+
+ RtpPacketSender::Priority priority;
+ uint32_t ssrc;
+ uint16_t sequence_number;
+ int64_t capture_time_ms; // Absolute time of frame capture.
+ int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
+ int64_t sum_paused_ms; // Sum of time spent in queue while pacer is paused.
+ size_t bytes;
+ bool retransmission;
+ uint64_t enqueue_order;
+ std::list<Packet>::iterator this_it;
+ };
+
+ void Push(const Packet& packet);
+ const Packet& BeginPop();
+ void CancelPop(const Packet& packet);
+ void FinalizePop(const Packet& packet);
+ bool Empty() const;
+ size_t SizeInPackets() const;
+ uint64_t SizeInBytes() const;
+ int64_t OldestEnqueueTimeMs() const;
+ void UpdateQueueTime(int64_t timestamp_ms);
+ void SetPauseState(bool paused, int64_t timestamp_ms);
+ int64_t AverageQueueTimeMs() const;
+
+ private:
+ // Try to add a packet to the set of ssrc/seqno identifiers currently in the
+ // queue. Return true if inserted, false if this is a duplicate.
+ bool AddToDupeSet(const Packet& packet);
+
+ void RemoveFromDupeSet(const Packet& packet);
+
+ // Used by priority queue to sort packets.
+ struct Comparator {
+ bool operator()(const Packet* first, const Packet* second) {
+ // Highest prio = 0.
+ if (first->priority != second->priority)
+ return first->priority > second->priority;
+
+ // Retransmissions go first.
+ if (second->retransmission != first->retransmission)
+ return second->retransmission;
+
+ // Older frames have higher prio.
+ if (first->capture_time_ms != second->capture_time_ms)
+ return first->capture_time_ms > second->capture_time_ms;
+
+ return first->enqueue_order > second->enqueue_order;
+ }
+ };
+
+ // List of packets, in the order the were enqueued. Since dequeueing may
+ // occur out of order, use list instead of vector.
+ std::list<Packet> packet_list_;
+ // Priority queue of the packets, sorted according to Comparator.
+ // Use pointers into list, to avodi moving whole struct within heap.
+ std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
+ // Total number of bytes in the queue.
+ uint64_t bytes_;
+ // Map<ssrc, std::set<seq_no> >, for checking duplicates.
+ typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
+ SsrcSeqNoMap dupe_map_;
+ const Clock* const clock_;
+ int64_t queue_time_sum_;
+ int64_t time_last_updated_;
+ bool paused_;
+};
+} // namespace webrtc
+
+#endif // MODULES_PACING_PACKET_QUEUE_H_