Add support within PacedSender and pacer queue for owning rtp packets.
This CL builds on https://webrtc-review.googlesource.com/c/src/+/142165
It adds the parts within the paced sender that uses those send methods.
A follow-up will add the pre-pacer RTP sender parts. That CL will also
add proper integration testing. Here, I mostly add coverage for the new
send methods. When the old code-path is removed, all tests need to be
converted to exclusively use the owned path.
Bug: webrtc:10633
Change-Id: I870d9a2285f07a7b7b0ef6758aa310808f210f28
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/142179
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28308}
diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index a35ba85..9c9f7d9 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -85,6 +85,7 @@
"../rtp_rtcp",
"../rtp_rtcp:mock_rtp_rtcp",
"../rtp_rtcp:rtp_rtcp_format",
+ "//third_party/abseil-cpp/absl/memory",
]
}
diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc
index 3e36f14..6177ca6 100644
--- a/modules/pacing/paced_sender.cc
+++ b/modules/pacing/paced_sender.cc
@@ -44,6 +44,27 @@
return field_trials.Lookup(key).find("Enabled") == 0;
}
+int GetPriorityForType(RtpPacketToSend::Type type) {
+ switch (type) {
+ case RtpPacketToSend::Type::kAudio:
+ // Audio is always prioritized over other packet types.
+ return 0;
+ case RtpPacketToSend::Type::kRetransmission:
+ // Send retransmissions before new media.
+ return 1;
+ case RtpPacketToSend::Type::kVideo:
+ // Video has "normal" priority, in the old speak.
+ return 2;
+ case RtpPacketToSend::Type::kForwardErrorCorrection:
+ // Redundancy is OK to drop, but the content is hopefully not useless.
+ return 3;
+ case RtpPacketToSend::Type::kPadding:
+ // Packets that are in themselves likely useless, only sent to keep the
+ // BWE high.
+ return 4;
+ }
+}
+
} // namespace
const int64_t PacedSender::kMaxQueueLengthMs = 2000;
const float PacedSender::kDefaultPaceMultiplier = 2.5f;
@@ -186,9 +207,37 @@
if (capture_time_ms < 0)
capture_time_ms = now_ms;
- packets_.Push(RoundRobinPacketQueue::Packet(
- priority, ssrc, sequence_number, capture_time_ms, now_ms, bytes,
- retransmission, packet_counter_++));
+ RtpPacketToSend::Type type;
+ switch (priority) {
+ case RtpPacketPacer::kHighPriority:
+ type = RtpPacketToSend::Type::kAudio;
+ break;
+ case RtpPacketPacer::kNormalPriority:
+ type = RtpPacketToSend::Type::kRetransmission;
+ break;
+ default:
+ type = RtpPacketToSend::Type::kVideo;
+ }
+ packets_.Push(GetPriorityForType(type), type, ssrc, sequence_number,
+ capture_time_ms, now_ms, bytes, retransmission,
+ packet_counter_++);
+}
+
+void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
+ rtc::CritScope cs(&critsect_);
+ RTC_DCHECK(pacing_bitrate_kbps_ > 0)
+ << "SetPacingRate must be called before InsertPacket.";
+
+ int64_t now_ms = TimeMilliseconds();
+ prober_.OnIncomingPacket(packet->payload_size());
+
+ if (packet->capture_time_ms() < 0) {
+ packet->set_capture_time_ms(now_ms);
+ }
+
+ RTC_CHECK(packet->packet_type());
+ int priority = GetPriorityForType(*packet->packet_type());
+ packets_.Push(priority, now_ms, packet_counter_++, std::move(packet));
}
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
@@ -324,27 +373,43 @@
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!packets_.Empty() && !paused_) {
- const auto* packet = GetPendingPacket(pacing_info);
+ auto* packet = GetPendingPacket(pacing_info);
if (packet == nullptr)
break;
+ std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
+ const bool owned_rtp_packet = rtp_packet != nullptr;
+
critsect_.Leave();
- RtpPacketSendResult success = packet_router_->TimeToSendPacket(
- packet->ssrc, packet->sequence_number, packet->capture_time_ms,
- packet->retransmission, pacing_info);
+
+ RtpPacketSendResult success;
+ if (rtp_packet != nullptr) {
+ packet_router_->SendPacket(std::move(rtp_packet), pacing_info);
+ success = RtpPacketSendResult::kSuccess;
+ } else {
+ success = packet_router_->TimeToSendPacket(
+ packet->ssrc(), packet->sequence_number(), packet->capture_time_ms(),
+ packet->is_retransmission(), pacing_info);
+ }
+
critsect_.Enter();
if (success == RtpPacketSendResult::kSuccess ||
success == RtpPacketSendResult::kPacketNotFound) {
// Packet sent or invalid packet, remove it from queue.
// TODO(webrtc:8052): Don't consume media budget on kInvalid.
- bytes_sent += packet->bytes;
+ bytes_sent += packet->size_in_bytes();
// Send succeeded, remove it from the queue.
OnPacketSent(packet);
if (is_probing && bytes_sent > recommended_probe_size)
break;
+ } else if (owned_rtp_packet) {
+ // Send failed, but we can't put it back in the queue, remove it without
+ // consuming budget.
+ packets_.FinalizePop();
+ break;
} else {
// Send failed, put it back into the queue.
- packets_.CancelPop(*packet);
+ packets_.CancelPop();
break;
}
}
@@ -379,34 +444,34 @@
process_thread_ = process_thread;
}
-const RoundRobinPacketQueue::Packet* PacedSender::GetPendingPacket(
+RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket(
const PacedPacketInfo& pacing_info) {
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
- const RoundRobinPacketQueue::Packet* packet = &packets_.BeginPop();
- bool audio_packet = packet->priority == kHighPriority;
+ RoundRobinPacketQueue::QueuedPacket* packet = packets_.BeginPop();
+ bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
bool apply_pacing = !audio_packet || pace_audio_;
if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
pacing_info.probe_cluster_id ==
PacedPacketInfo::kNotAProbe))) {
- packets_.CancelPop(*packet);
+ packets_.CancelPop();
return nullptr;
}
return packet;
}
-void PacedSender::OnPacketSent(const RoundRobinPacketQueue::Packet* packet) {
+void PacedSender::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) {
if (first_sent_packet_ms_ == -1)
first_sent_packet_ms_ = TimeMilliseconds();
- bool audio_packet = packet->priority == kHighPriority;
+ bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
if (!audio_packet || account_for_audio_) {
// Update media bytes sent.
- UpdateBudgetWithBytesSent(packet->bytes);
+ UpdateBudgetWithBytesSent(packet->size_in_bytes());
last_send_time_us_ = clock_->TimeInMicroseconds();
}
// Send succeeded, remove it from the queue.
- packets_.FinalizePop(*packet);
+ packets_.FinalizePop();
}
void PacedSender::OnPaddingSent(size_t bytes_sent) {
diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h
index eb98ca2..c67e162 100644
--- a/modules/pacing/paced_sender.h
+++ b/modules/pacing/paced_sender.h
@@ -25,7 +25,9 @@
#include "modules/pacing/interval_budget.h"
#include "modules/pacing/packet_router.h"
#include "modules/pacing/round_robin_packet_queue.h"
+#include "modules/rtp_rtcp/include/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/critical_section.h"
#include "rtc_base/experiments/field_trial_parser.h"
@@ -35,7 +37,7 @@
class Clock;
class RtcEventLog;
-class PacedSender : public Module, public RtpPacketSender {
+class PacedSender : public Module, public RtpPacketPacer {
public:
static constexpr int64_t kNoCongestionWindow = -1;
@@ -77,8 +79,8 @@
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(uint32_t pacing_rate_bps, uint32_t padding_rate_bps);
- // Returns true if we send the packet now, else it will add the packet
- // information to the queue and call TimeToSendPacket when it's time to send.
+ // Adds the packet information to the queue and calls TimeToSendPacket
+ // when it's time to send.
void InsertPacket(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t sequence_number,
@@ -86,6 +88,10 @@
size_t bytes,
bool retransmission) override;
+ // Adds the packet to the queue and calls PacketRouter::SendPacket() when
+ // it's time to send.
+ void EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) override;
+
// Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for
// the pacer budget calculation. The audio traffic still will be injected
@@ -129,10 +135,10 @@
void UpdateBudgetWithBytesSent(size_t bytes)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- const RoundRobinPacketQueue::Packet* GetPendingPacket(
+ RoundRobinPacketQueue::QueuedPacket* GetPendingPacket(
const PacedPacketInfo& pacing_info)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
- void OnPacketSent(const RoundRobinPacketQueue::Packet* packet)
+ void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void OnPaddingSent(size_t padding_sent)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc
index d991d61..d630980 100644
--- a/modules/pacing/paced_sender_unittest.cc
+++ b/modules/pacing/paced_sender_unittest.cc
@@ -12,6 +12,7 @@
#include <memory>
#include <string>
+#include "absl/memory/memory.h"
#include "modules/pacing/paced_sender.h"
#include "modules/pacing/packet_router.h"
#include "system_wrappers/include/clock.h"
@@ -22,6 +23,8 @@
using ::testing::_;
using ::testing::Field;
+using ::testing::Pointee;
+using ::testing::Property;
using ::testing::Return;
namespace {
@@ -34,6 +37,11 @@
constexpr int kBitrateProbingError = 150000;
const float kPaceMultiplier = 2.5f;
+
+constexpr uint32_t kAudioSsrc = 12345;
+constexpr uint32_t kVideoSsrc = 234565;
+constexpr uint32_t kVideoRtxSsrc = 34567;
+constexpr uint32_t kFlexFecSsrc = 45678;
} // namespace
namespace webrtc {
@@ -49,6 +57,9 @@
int64_t capture_time_ms,
bool retransmission,
const PacedPacketInfo& pacing_info));
+ MOCK_METHOD2(SendPacket,
+ void(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& pacing_info));
MOCK_METHOD2(TimeToSendPadding,
size_t(size_t bytes, const PacedPacketInfo& pacing_info));
};
@@ -139,6 +150,30 @@
.Times(1)
.WillRepeatedly(Return(RtpPacketSendResult::kSuccess));
}
+
+ std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
+ auto packet = absl::make_unique<RtpPacketToSend>(nullptr);
+ packet->set_packet_type(type);
+ switch (type) {
+ case RtpPacketToSend::Type::kAudio:
+ packet->SetSsrc(kAudioSsrc);
+ break;
+ case RtpPacketToSend::Type::kVideo:
+ packet->SetSsrc(kVideoSsrc);
+ break;
+ case RtpPacketToSend::Type::kRetransmission:
+ case RtpPacketToSend::Type::kPadding:
+ packet->SetSsrc(kVideoRtxSsrc);
+ break;
+ case RtpPacketToSend::Type::kForwardErrorCorrection:
+ packet->SetSsrc(kFlexFecSsrc);
+ break;
+ }
+
+ packet->SetPayloadSize(234);
+ return packet;
+ }
+
SimulatedClock clock_;
MockPacedSenderCallback callback_;
std::unique_ptr<PacedSender> send_bucket_;
@@ -1292,6 +1327,39 @@
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
}
+TEST_F(PacedSenderTest, OwnedPacketPrioritizedOnType) {
+ // Insert a packet of each type, from low to high priority. Since priority
+ // is weighted higher than insert order, these should come out of the pacer
+ // in backwards order.
+ for (RtpPacketToSend::Type type :
+ {RtpPacketToSend::Type::kPadding,
+ RtpPacketToSend::Type::kForwardErrorCorrection,
+ RtpPacketToSend::Type::kVideo, RtpPacketToSend::Type::kRetransmission,
+ RtpPacketToSend::Type::kAudio}) {
+ send_bucket_->EnqueuePacket(BuildRtpPacket(type));
+ }
+
+ ::testing::InSequence seq;
+ EXPECT_CALL(
+ callback_,
+ SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kAudioSsrc)), _));
+ EXPECT_CALL(
+ callback_,
+ SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
+ EXPECT_CALL(
+ callback_,
+ SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoSsrc)), _));
+ EXPECT_CALL(
+ callback_,
+ SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kFlexFecSsrc)), _));
+ EXPECT_CALL(
+ callback_,
+ SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
+
+ clock_.AdvanceTimeMilliseconds(200);
+ send_bucket_->Process();
+}
+
// TODO(philipel): Move to PacketQueue2 unittests.
#if 0
TEST_F(PacedSenderTest, QueueTimeWithPause) {
diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc
index a7c2b93..6d2c7ff 100644
--- a/modules/pacing/packet_router.cc
+++ b/modules/pacing/packet_router.cc
@@ -13,6 +13,7 @@
#include <algorithm>
#include <cstdint>
#include <limits>
+#include <utility>
#include "absl/types/optional.h"
#include "modules/rtp_rtcp/include/rtp_rtcp.h"
@@ -20,6 +21,7 @@
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
#include "rtc_base/atomic_ops.h"
#include "rtc_base/checks.h"
+#include "rtc_base/logging.h"
#include "rtc_base/time_utils.h"
namespace webrtc {
@@ -125,6 +127,29 @@
return RtpPacketSendResult::kPacketNotFound;
}
+void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info) {
+ rtc::CritScope cs(&modules_crit_);
+ for (auto* rtp_module : rtp_send_modules_) {
+ if (rtp_module->TrySendPacket(packet.get(), cluster_info)) {
+ const bool can_send_padding =
+ (rtp_module->RtxSendStatus() & kRtxRedundantPayloads) &&
+ rtp_module->HasBweExtensions();
+ if (can_send_padding) {
+ // This is now the last module to send media, and has the desired
+ // properties needed for payload based padding. Cache it for later use.
+ last_send_module_ = rtp_module;
+ }
+ return;
+ }
+ }
+
+ RTC_LOG(LS_WARNING) << "Failed to send packet, matching RTP module not found "
+ "or transport error. SSRC = "
+ << packet->Ssrc() << ", sequence number "
+ << packet->SequenceNumber();
+}
+
size_t PacketRouter::TimeToSendPadding(size_t bytes_to_send,
const PacedPacketInfo& pacing_info) {
size_t total_bytes_sent = 0;
diff --git a/modules/pacing/packet_router.h b/modules/pacing/packet_router.h
index 4ff5a0b..9a51899 100644
--- a/modules/pacing/packet_router.h
+++ b/modules/pacing/packet_router.h
@@ -14,11 +14,13 @@
#include <stddef.h>
#include <stdint.h>
#include <list>
+#include <memory>
#include <vector>
#include "api/transport/network_types.h"
#include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/critical_section.h"
#include "rtc_base/thread_annotations.h"
@@ -30,7 +32,7 @@
class TransportFeedback;
} // namespace rtcp
-// PacketRouter keeps track of RTP send modules to support the pacer.
+// PacketRouter keeps track of rtp send modules to support the pacer.
// In addition, it handles feedback messages, which are sent on a send
// module if possible (sender report), otherwise on receive module
// (receiver report). For the latter case, we also keep track of the
@@ -56,6 +58,9 @@
bool retransmission,
const PacedPacketInfo& packet_info);
+ virtual void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
+ const PacedPacketInfo& cluster_info);
+
virtual size_t TimeToSendPadding(size_t bytes,
const PacedPacketInfo& packet_info);
diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc
index a87c47c..9f52a6a 100644
--- a/modules/pacing/round_robin_packet_queue.cc
+++ b/modules/pacing/round_robin_packet_queue.cc
@@ -18,36 +18,53 @@
namespace webrtc {
-RoundRobinPacketQueue::Packet::Packet(RtpPacketSender::Priority priority,
- uint32_t ssrc,
- uint16_t seq_number,
- int64_t capture_time_ms,
- int64_t enqueue_time_ms,
- size_t length_in_bytes,
- bool retransmission,
- uint64_t enqueue_order)
- : priority(priority),
- ssrc(ssrc),
- sequence_number(seq_number),
- capture_time_ms(capture_time_ms),
- enqueue_time_ms(enqueue_time_ms),
- sum_paused_ms(0),
- bytes(length_in_bytes),
- retransmission(retransmission),
- enqueue_order(enqueue_order) {}
+RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) =
+ default;
+RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
-RoundRobinPacketQueue::Packet::Packet(const Packet& other) = default;
+RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
+ int priority,
+ RtpPacketToSend::Type type,
+ uint32_t ssrc,
+ uint16_t seq_number,
+ int64_t capture_time_ms,
+ int64_t enqueue_time_ms,
+ size_t length_in_bytes,
+ bool retransmission,
+ uint64_t enqueue_order,
+ std::multiset<int64_t>::iterator enqueue_time_it,
+ absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
+ packet_it)
+ : type_(type),
+ priority_(priority),
+ ssrc_(ssrc),
+ sequence_number_(seq_number),
+ capture_time_ms_(capture_time_ms),
+ enqueue_time_ms_(enqueue_time_ms),
+ bytes_(length_in_bytes),
+ retransmission_(retransmission),
+ enqueue_order_(enqueue_order),
+ enqueue_time_it_(enqueue_time_it),
+ packet_it_(packet_it) {}
-RoundRobinPacketQueue::Packet::~Packet() {}
+std::unique_ptr<RtpPacketToSend>
+RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
+ return packet_it_ ? std::move(**packet_it_) : nullptr;
+}
-bool RoundRobinPacketQueue::Packet::operator<(
- const RoundRobinPacketQueue::Packet& other) const {
- if (priority != other.priority)
- return priority > other.priority;
- if (retransmission != other.retransmission)
- return other.retransmission;
+void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTimeMs(
+ int64_t pause_time_sum_ms) {
+ enqueue_time_ms_ -= pause_time_sum_ms;
+}
- return enqueue_order > other.enqueue_order;
+bool RoundRobinPacketQueue::QueuedPacket::operator<(
+ const RoundRobinPacketQueue::QueuedPacket& other) const {
+ if (priority_ != other.priority_)
+ return priority_ > other.priority_;
+ if (retransmission_ != other.retransmission_)
+ return other.retransmission_;
+
+ return enqueue_order_ > other.enqueue_order_;
}
RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {}
@@ -59,50 +76,41 @@
RoundRobinPacketQueue::~RoundRobinPacketQueue() {}
-void RoundRobinPacketQueue::Push(const Packet& packet_to_insert) {
- Packet packet(packet_to_insert);
-
- auto stream_info_it = streams_.find(packet.ssrc);
- if (stream_info_it == streams_.end()) {
- stream_info_it = streams_.emplace(packet.ssrc, Stream()).first;
- stream_info_it->second.priority_it = stream_priorities_.end();
- stream_info_it->second.ssrc = packet.ssrc;
- }
-
- Stream* stream = &stream_info_it->second;
-
- if (stream->priority_it == stream_priorities_.end()) {
- // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
- RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
- stream->priority_it = stream_priorities_.emplace(
- StreamPrioKey(packet.priority, stream->bytes), packet.ssrc);
- } else if (packet.priority < stream->priority_it->first.priority) {
- // If the priority of this SSRC increased, remove the outdated StreamPrioKey
- // and insert a new one with the new priority. Note that
- // RtpPacketSender::Priority uses lower ordinal for higher priority.
- stream_priorities_.erase(stream->priority_it);
- stream->priority_it = stream_priorities_.emplace(
- StreamPrioKey(packet.priority, stream->bytes), packet.ssrc);
- }
- RTC_CHECK(stream->priority_it != stream_priorities_.end());
-
- packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms);
-
- // In order to figure out how much time a packet has spent in the queue while
- // not in a paused state, we subtract the total amount of time the queue has
- // been paused so far, and when the packet is poped we subtract the total
- // amount of time the queue has been paused at that moment. This way we
- // subtract the total amount of time the packet has spent in the queue while
- // in a paused state.
- UpdateQueueTime(packet.enqueue_time_ms);
- packet.enqueue_time_ms -= pause_time_sum_ms_;
- stream->packet_queue.push(packet);
-
- size_packets_ += 1;
- size_bytes_ += packet.bytes;
+void RoundRobinPacketQueue::Push(int priority,
+ RtpPacketToSend::Type type,
+ uint32_t ssrc,
+ uint16_t seq_number,
+ int64_t capture_time_ms,
+ int64_t enqueue_time_ms,
+ size_t length_in_bytes,
+ bool retransmission,
+ uint64_t enqueue_order) {
+ Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms,
+ enqueue_time_ms, length_in_bytes, retransmission,
+ enqueue_order, enqueue_times_.insert(enqueue_time_ms),
+ absl::nullopt));
}
-const RoundRobinPacketQueue::Packet& RoundRobinPacketQueue::BeginPop() {
+void RoundRobinPacketQueue::Push(int priority,
+ int64_t enqueue_time_ms,
+ uint64_t enqueue_order,
+ std::unique_ptr<RtpPacketToSend> packet) {
+ uint32_t ssrc = packet->Ssrc();
+ uint16_t sequence_number = packet->SequenceNumber();
+ int64_t capture_time_ms = packet->capture_time_ms();
+ size_t size_bytes = packet->payload_size();
+ auto type = packet->packet_type();
+ RTC_DCHECK(type.has_value());
+
+ rtp_packets_.push_front(std::move(packet));
+ Push(QueuedPacket(priority, *type, ssrc, sequence_number, capture_time_ms,
+ enqueue_time_ms, size_bytes,
+ *type == RtpPacketToSend::Type::kRetransmission,
+ enqueue_order, enqueue_times_.insert(enqueue_time_ms),
+ rtp_packets_.begin()));
+}
+
+RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() {
RTC_CHECK(!pop_packet_ && !pop_stream_);
Stream* stream = GetHighestPriorityStream();
@@ -110,22 +118,22 @@
pop_packet_.emplace(stream->packet_queue.top());
stream->packet_queue.pop();
- return *pop_packet_;
+ return &pop_packet_.value();
}
-void RoundRobinPacketQueue::CancelPop(const Packet& packet) {
+void RoundRobinPacketQueue::CancelPop() {
RTC_CHECK(pop_packet_ && pop_stream_);
(*pop_stream_)->packet_queue.push(*pop_packet_);
pop_packet_.reset();
pop_stream_.reset();
}
-void RoundRobinPacketQueue::FinalizePop(const Packet& packet) {
+void RoundRobinPacketQueue::FinalizePop() {
if (!Empty()) {
RTC_CHECK(pop_packet_ && pop_stream_);
Stream* stream = *pop_stream_;
stream_priorities_.erase(stream->priority_it);
- const Packet& packet = *pop_packet_;
+ const QueuedPacket& packet = *pop_packet_;
// Calculate the total amount of time spent by this packet in the queue
// while in a non-paused state. Note that the |pause_time_sum_ms_| was
@@ -133,11 +141,16 @@
// by subtracting it now we effectively remove the time spent in in the
// queue while in a paused state.
int64_t time_in_non_paused_state_ms =
- time_last_updated_ms_ - packet.enqueue_time_ms - pause_time_sum_ms_;
+ time_last_updated_ms_ - packet.enqueue_time_ms() - pause_time_sum_ms_;
queue_time_sum_ms_ -= time_in_non_paused_state_ms;
- RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end());
- enqueue_times_.erase(packet.enqueue_time_it);
+ RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end());
+ enqueue_times_.erase(packet.EnqueueTimeIterator());
+
+ auto packet_it = packet.PacketIterator();
+ if (packet_it) {
+ rtp_packets_.erase(*packet_it);
+ }
// Update |bytes| of this stream. The general idea is that the stream that
// has sent the least amount of bytes should have the highest priority.
@@ -145,11 +158,11 @@
// case a "budget" will be built up for the stream sending at the lower
// rate. To avoid building a too large budget we limit |bytes| to be within
// kMaxLeading bytes of the stream that has sent the most amount of bytes.
- stream->bytes =
- std::max(stream->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes);
+ stream->bytes = std::max(stream->bytes + packet.size_in_bytes(),
+ max_bytes_ - kMaxLeadingBytes);
max_bytes_ = std::max(max_bytes_, stream->bytes);
- size_bytes_ -= packet.bytes;
+ size_bytes_ -= packet.size_in_bytes();
size_packets_ -= 1;
RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0);
@@ -158,7 +171,7 @@
if (stream->packet_queue.empty()) {
stream->priority_it = stream_priorities_.end();
} else {
- RtpPacketSender::Priority priority = stream->packet_queue.top().priority;
+ int priority = stream->packet_queue.top().priority();
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(priority, stream->bytes), stream->ssrc);
}
@@ -218,6 +231,46 @@
return queue_time_sum_ms_ / size_packets_;
}
+void RoundRobinPacketQueue::Push(QueuedPacket packet) {
+ auto stream_info_it = streams_.find(packet.ssrc());
+ if (stream_info_it == streams_.end()) {
+ stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
+ stream_info_it->second.priority_it = stream_priorities_.end();
+ stream_info_it->second.ssrc = packet.ssrc();
+ }
+
+ Stream* stream = &stream_info_it->second;
+
+ if (stream->priority_it == stream_priorities_.end()) {
+ // If the SSRC is not currently scheduled, add it to |stream_priorities_|.
+ RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
+ stream->priority_it = stream_priorities_.emplace(
+ StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc());
+ } else if (packet.priority() < stream->priority_it->first.priority) {
+ // If the priority of this SSRC increased, remove the outdated StreamPrioKey
+ // and insert a new one with the new priority. Note that |priority_| uses
+ // lower ordinal for higher priority.
+ stream_priorities_.erase(stream->priority_it);
+ stream->priority_it = stream_priorities_.emplace(
+ StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc());
+ }
+ RTC_CHECK(stream->priority_it != stream_priorities_.end());
+
+ // In order to figure out how much time a packet has spent in the queue while
+ // not in a paused state, we subtract the total amount of time the queue has
+ // been paused so far, and when the packet is popped we subtract the total
+ // amount of time the queue has been paused at that moment. This way we
+ // subtract the total amount of time the packet has spent in the queue while
+ // in a paused state.
+ UpdateQueueTime(packet.enqueue_time_ms());
+ packet.SubtractPauseTimeMs(pause_time_sum_ms_);
+
+ size_packets_ += 1;
+ size_bytes_ += packet.size_in_bytes();
+
+ stream->packet_queue.push(packet);
+}
+
RoundRobinPacketQueue::Stream*
RoundRobinPacketQueue::GetHighestPriorityStream() {
RTC_CHECK(!stream_priorities_.empty());
diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h
index 74b855a..812ae87 100644
--- a/modules/pacing/round_robin_packet_queue.h
+++ b/modules/pacing/round_robin_packet_queue.h
@@ -15,11 +15,13 @@
#include <stdint.h>
#include <list>
#include <map>
+#include <memory>
#include <queue>
#include <set>
#include "absl/types/optional.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
@@ -29,36 +31,80 @@
explicit RoundRobinPacketQueue(int64_t start_time_us);
~RoundRobinPacketQueue();
- struct Packet {
- Packet(RtpPacketSender::Priority priority,
- uint32_t ssrc,
- uint16_t seq_number,
- int64_t capture_time_ms,
- int64_t enqueue_time_ms,
- size_t length_in_bytes,
- bool retransmission,
- uint64_t enqueue_order);
- Packet(const Packet& other);
- virtual ~Packet();
- bool operator<(const Packet& other) const;
+ struct QueuedPacket {
+ public:
+ QueuedPacket(
+ int priority,
+ RtpPacketToSend::Type type,
+ uint32_t ssrc,
+ uint16_t seq_number,
+ int64_t capture_time_ms,
+ int64_t enqueue_time_ms,
+ size_t length_in_bytes,
+ bool retransmission,
+ uint64_t enqueue_order,
+ std::multiset<int64_t>::iterator enqueue_time_it,
+ absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
+ packet_it);
+ QueuedPacket(const QueuedPacket& rhs);
+ ~QueuedPacket();
- RtpPacketSender::Priority priority;
- uint32_t ssrc;
- uint16_t sequence_number;
- int64_t capture_time_ms; // Absolute time of frame capture.
- int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
- int64_t sum_paused_ms;
- size_t bytes;
- bool retransmission;
- uint64_t enqueue_order;
- std::list<Packet>::iterator this_it;
- std::multiset<int64_t>::iterator enqueue_time_it;
+ bool operator<(const QueuedPacket& other) const;
+
+ int priority() const { return priority_; }
+ RtpPacketToSend::Type type() const { return type_; }
+ uint32_t ssrc() const { return ssrc_; }
+ uint16_t sequence_number() const { return sequence_number_; }
+ int64_t capture_time_ms() const { return capture_time_ms_; }
+ int64_t enqueue_time_ms() const { return enqueue_time_ms_; }
+ size_t size_in_bytes() const { return bytes_; }
+ bool is_retransmission() const { return retransmission_; }
+ uint64_t enqueue_order() const { return enqueue_order_; }
+ std::unique_ptr<RtpPacketToSend> ReleasePacket();
+
+ // For internal use.
+ absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
+ PacketIterator() const {
+ return packet_it_;
+ }
+ std::multiset<int64_t>::iterator EnqueueTimeIterator() const {
+ return enqueue_time_it_;
+ }
+ void SubtractPauseTimeMs(int64_t pause_time_sum_ms);
+
+ private:
+ RtpPacketToSend::Type type_;
+ int priority_;
+ uint32_t ssrc_;
+ uint16_t sequence_number_;
+ int64_t capture_time_ms_; // Absolute time of frame capture.
+ int64_t enqueue_time_ms_; // Absolute time of pacer queue entry.
+ size_t bytes_;
+ bool retransmission_;
+ uint64_t enqueue_order_;
+ std::multiset<int64_t>::iterator enqueue_time_it_;
+ // Iterator into |rtp_packets_| where the memory for RtpPacket is owned,
+ // if applicable.
+ absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
+ packet_it_;
};
- void Push(const Packet& packet);
- const Packet& BeginPop();
- void CancelPop(const Packet& packet);
- void FinalizePop(const Packet& packet);
+ void Push(int priority,
+ RtpPacketToSend::Type type,
+ uint32_t ssrc,
+ uint16_t seq_number,
+ int64_t capture_time_ms,
+ int64_t enqueue_time_ms,
+ size_t length_in_bytes,
+ bool retransmission,
+ uint64_t enqueue_order);
+ void Push(int priority,
+ int64_t enqueue_time_ms,
+ uint64_t enqueue_order,
+ std::unique_ptr<RtpPacketToSend> packet);
+ QueuedPacket* BeginPop();
+ void CancelPop();
+ void FinalizePop();
bool Empty() const;
size_t SizeInPackets() const;
@@ -71,7 +117,7 @@
private:
struct StreamPrioKey {
- StreamPrioKey(RtpPacketSender::Priority priority, int64_t bytes)
+ StreamPrioKey(int priority, int64_t bytes)
: priority(priority), bytes(bytes) {}
bool operator<(const StreamPrioKey& other) const {
@@ -80,7 +126,7 @@
return bytes < other.bytes;
}
- const RtpPacketSender::Priority priority;
+ const int priority;
const size_t bytes;
};
@@ -92,7 +138,7 @@
size_t bytes;
uint32_t ssrc;
- std::priority_queue<Packet> packet_queue;
+ std::priority_queue<QueuedPacket> packet_queue;
// Whenever a packet is inserted for this stream we check if |priority_it|
// points to an element in |stream_priorities_|, and if it does it means
@@ -104,13 +150,15 @@
static constexpr size_t kMaxLeadingBytes = 1400;
+ void Push(QueuedPacket packet);
+
Stream* GetHighestPriorityStream();
// Just used to verify correctness.
bool IsSsrcScheduled(uint32_t ssrc) const;
int64_t time_last_updated_ms_;
- absl::optional<Packet> pop_packet_;
+ absl::optional<QueuedPacket> pop_packet_;
absl::optional<Stream*> pop_stream_;
bool paused_ = false;
@@ -132,6 +180,12 @@
// The enqueue time of every packet currently in the queue. Used to figure out
// the age of the oldest packet in the queue.
std::multiset<int64_t> enqueue_times_;
+
+ // List of RTP packets to be sent, not necessarily in the order they will be
+ // sent. PacketInfo.packet_it will point to an entry in this list, or the
+ // end iterator of this list if queue does not have direct ownership of the
+ // packet.
+ std::list<std::unique_ptr<RtpPacketToSend>> rtp_packets_;
};
} // namespace webrtc
diff --git a/modules/rtp_rtcp/include/rtp_packet_pacer.h b/modules/rtp_rtcp/include/rtp_packet_pacer.h
index 9820fc2..180ddf7 100644
--- a/modules/rtp_rtcp/include/rtp_packet_pacer.h
+++ b/modules/rtp_rtcp/include/rtp_packet_pacer.h
@@ -23,10 +23,10 @@
// TODO(bugs.webrtc.org/10633): Add things missing to this interface so that we
// can use multiple different pacer implementations, and stop inheriting from
// RtpPacketSender.
-class RtpPacketPacer : RtpPacketSender {
+class RtpPacketPacer : public RtpPacketSender {
public:
RtpPacketPacer() = default;
- ~RtpPacketPacer() override;
+ ~RtpPacketPacer() override = default;
// Insert packet into queue, for eventual transmission. Based on the type of
// the packet, it will prioritized and scheduled relative to other packets and
diff --git a/modules/rtp_rtcp/include/rtp_rtcp.h b/modules/rtp_rtcp/include/rtp_rtcp.h
index 6734e6c..dbe6345 100644
--- a/modules/rtp_rtcp/include/rtp_rtcp.h
+++ b/modules/rtp_rtcp/include/rtp_rtcp.h
@@ -26,6 +26,7 @@
#include "modules/rtp_rtcp/include/receive_statistics.h"
#include "modules/rtp_rtcp/include/report_block_data.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "modules/rtp_rtcp/source/rtp_sender.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/deprecation.h"