|  | /* | 
|  | *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved. | 
|  | * | 
|  | *  Use of this source code is governed by a BSD-style license | 
|  | *  that can be found in the LICENSE file in the root of the source | 
|  | *  tree. An additional intellectual property rights grant can be found | 
|  | *  in the file PATENTS.  All contributing project authors may | 
|  | *  be found in the AUTHORS file in the root of the source tree. | 
|  | */ | 
|  |  | 
|  | #include "modules/pacing/round_robin_packet_queue.h" | 
|  |  | 
|  | #include <algorithm> | 
|  | #include <cstdint> | 
|  | #include <utility> | 
|  |  | 
|  | #include "rtc_base/checks.h" | 
|  |  | 
|  | namespace webrtc { | 
|  |  | 
|  | RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) = | 
|  | default; | 
|  | RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default; | 
|  |  | 
|  | RoundRobinPacketQueue::QueuedPacket::QueuedPacket( | 
|  | int priority, | 
|  | RtpPacketToSend::Type type, | 
|  | uint32_t ssrc, | 
|  | uint16_t seq_number, | 
|  | int64_t capture_time_ms, | 
|  | int64_t enqueue_time_ms, | 
|  | size_t length_in_bytes, | 
|  | bool retransmission, | 
|  | uint64_t enqueue_order, | 
|  | std::multiset<int64_t>::iterator enqueue_time_it, | 
|  | absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator> | 
|  | packet_it) | 
|  | : type_(type), | 
|  | priority_(priority), | 
|  | ssrc_(ssrc), | 
|  | sequence_number_(seq_number), | 
|  | capture_time_ms_(capture_time_ms), | 
|  | enqueue_time_ms_(enqueue_time_ms), | 
|  | bytes_(length_in_bytes), | 
|  | retransmission_(retransmission), | 
|  | enqueue_order_(enqueue_order), | 
|  | enqueue_time_it_(enqueue_time_it), | 
|  | packet_it_(packet_it) {} | 
|  |  | 
|  | std::unique_ptr<RtpPacketToSend> | 
|  | RoundRobinPacketQueue::QueuedPacket::ReleasePacket() { | 
|  | return packet_it_ ? std::move(**packet_it_) : nullptr; | 
|  | } | 
|  |  | 
|  | void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTimeMs( | 
|  | int64_t pause_time_sum_ms) { | 
|  | enqueue_time_ms_ -= pause_time_sum_ms; | 
|  | } | 
|  |  | 
|  | bool RoundRobinPacketQueue::QueuedPacket::operator<( | 
|  | const RoundRobinPacketQueue::QueuedPacket& other) const { | 
|  | if (priority_ != other.priority_) | 
|  | return priority_ > other.priority_; | 
|  | if (retransmission_ != other.retransmission_) | 
|  | return other.retransmission_; | 
|  |  | 
|  | return enqueue_order_ > other.enqueue_order_; | 
|  | } | 
|  |  | 
|  | RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {} | 
|  | RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default; | 
|  | RoundRobinPacketQueue::Stream::~Stream() {} | 
|  |  | 
|  | RoundRobinPacketQueue::RoundRobinPacketQueue(int64_t start_time_us) | 
|  | : time_last_updated_ms_(start_time_us / 1000) {} | 
|  |  | 
|  | RoundRobinPacketQueue::~RoundRobinPacketQueue() {} | 
|  |  | 
|  | void RoundRobinPacketQueue::Push(int priority, | 
|  | RtpPacketToSend::Type type, | 
|  | uint32_t ssrc, | 
|  | uint16_t seq_number, | 
|  | int64_t capture_time_ms, | 
|  | int64_t enqueue_time_ms, | 
|  | size_t length_in_bytes, | 
|  | bool retransmission, | 
|  | uint64_t enqueue_order) { | 
|  | Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms, | 
|  | enqueue_time_ms, length_in_bytes, retransmission, | 
|  | enqueue_order, enqueue_times_.insert(enqueue_time_ms), | 
|  | absl::nullopt)); | 
|  | } | 
|  |  | 
|  | void RoundRobinPacketQueue::Push(int priority, | 
|  | int64_t enqueue_time_ms, | 
|  | uint64_t enqueue_order, | 
|  | std::unique_ptr<RtpPacketToSend> packet) { | 
|  | uint32_t ssrc = packet->Ssrc(); | 
|  | uint16_t sequence_number = packet->SequenceNumber(); | 
|  | int64_t capture_time_ms = packet->capture_time_ms(); | 
|  | size_t size_bytes = packet->payload_size(); | 
|  | auto type = packet->packet_type(); | 
|  | RTC_DCHECK(type.has_value()); | 
|  |  | 
|  | rtp_packets_.push_front(std::move(packet)); | 
|  | Push(QueuedPacket(priority, *type, ssrc, sequence_number, capture_time_ms, | 
|  | enqueue_time_ms, size_bytes, | 
|  | *type == RtpPacketToSend::Type::kRetransmission, | 
|  | enqueue_order, enqueue_times_.insert(enqueue_time_ms), | 
|  | rtp_packets_.begin())); | 
|  | } | 
|  |  | 
|  | RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() { | 
|  | RTC_CHECK(!pop_packet_ && !pop_stream_); | 
|  |  | 
|  | Stream* stream = GetHighestPriorityStream(); | 
|  | pop_stream_.emplace(stream); | 
|  | pop_packet_.emplace(stream->packet_queue.top()); | 
|  | stream->packet_queue.pop(); | 
|  |  | 
|  | return &pop_packet_.value(); | 
|  | } | 
|  |  | 
|  | void RoundRobinPacketQueue::CancelPop() { | 
|  | RTC_CHECK(pop_packet_ && pop_stream_); | 
|  | (*pop_stream_)->packet_queue.push(*pop_packet_); | 
|  | pop_packet_.reset(); | 
|  | pop_stream_.reset(); | 
|  | } | 
|  |  | 
|  | void RoundRobinPacketQueue::FinalizePop() { | 
|  | if (!Empty()) { | 
|  | RTC_CHECK(pop_packet_ && pop_stream_); | 
|  | Stream* stream = *pop_stream_; | 
|  | stream_priorities_.erase(stream->priority_it); | 
|  | const QueuedPacket& packet = *pop_packet_; | 
|  |  | 
|  | // Calculate the total amount of time spent by this packet in the queue | 
|  | // while in a non-paused state. Note that the |pause_time_sum_ms_| was | 
|  | // subtracted from |packet.enqueue_time_ms| when the packet was pushed, and | 
|  | // by subtracting it now we effectively remove the time spent in in the | 
|  | // queue while in a paused state. | 
|  | int64_t time_in_non_paused_state_ms = | 
|  | time_last_updated_ms_ - packet.enqueue_time_ms() - pause_time_sum_ms_; | 
|  | queue_time_sum_ms_ -= time_in_non_paused_state_ms; | 
|  |  | 
|  | RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end()); | 
|  | enqueue_times_.erase(packet.EnqueueTimeIterator()); | 
|  |  | 
|  | auto packet_it = packet.PacketIterator(); | 
|  | if (packet_it) { | 
|  | rtp_packets_.erase(*packet_it); | 
|  | } | 
|  |  | 
|  | // Update |bytes| of this stream. The general idea is that the stream that | 
|  | // has sent the least amount of bytes should have the highest priority. | 
|  | // The problem with that is if streams send with different rates, in which | 
|  | // case a "budget" will be built up for the stream sending at the lower | 
|  | // rate. To avoid building a too large budget we limit |bytes| to be within | 
|  | // kMaxLeading bytes of the stream that has sent the most amount of bytes. | 
|  | stream->bytes = std::max(stream->bytes + packet.size_in_bytes(), | 
|  | max_bytes_ - kMaxLeadingBytes); | 
|  | max_bytes_ = std::max(max_bytes_, stream->bytes); | 
|  |  | 
|  | size_bytes_ -= packet.size_in_bytes(); | 
|  | size_packets_ -= 1; | 
|  | RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0); | 
|  |  | 
|  | // If there are packets left to be sent, schedule the stream again. | 
|  | RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); | 
|  | if (stream->packet_queue.empty()) { | 
|  | stream->priority_it = stream_priorities_.end(); | 
|  | } else { | 
|  | int priority = stream->packet_queue.top().priority(); | 
|  | stream->priority_it = stream_priorities_.emplace( | 
|  | StreamPrioKey(priority, stream->bytes), stream->ssrc); | 
|  | } | 
|  |  | 
|  | pop_packet_.reset(); | 
|  | pop_stream_.reset(); | 
|  | } | 
|  | } | 
|  |  | 
|  | bool RoundRobinPacketQueue::Empty() const { | 
|  | RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) || | 
|  | (stream_priorities_.empty() && size_packets_ == 0)); | 
|  | return stream_priorities_.empty(); | 
|  | } | 
|  |  | 
|  | size_t RoundRobinPacketQueue::SizeInPackets() const { | 
|  | return size_packets_; | 
|  | } | 
|  |  | 
|  | uint64_t RoundRobinPacketQueue::SizeInBytes() const { | 
|  | return size_bytes_; | 
|  | } | 
|  |  | 
|  | int64_t RoundRobinPacketQueue::OldestEnqueueTimeMs() const { | 
|  | if (Empty()) | 
|  | return 0; | 
|  | RTC_CHECK(!enqueue_times_.empty()); | 
|  | return *enqueue_times_.begin(); | 
|  | } | 
|  |  | 
|  | void RoundRobinPacketQueue::UpdateQueueTime(int64_t timestamp_ms) { | 
|  | RTC_CHECK_GE(timestamp_ms, time_last_updated_ms_); | 
|  | if (timestamp_ms == time_last_updated_ms_) | 
|  | return; | 
|  |  | 
|  | int64_t delta_ms = timestamp_ms - time_last_updated_ms_; | 
|  |  | 
|  | if (paused_) { | 
|  | pause_time_sum_ms_ += delta_ms; | 
|  | } else { | 
|  | queue_time_sum_ms_ += delta_ms * size_packets_; | 
|  | } | 
|  |  | 
|  | time_last_updated_ms_ = timestamp_ms; | 
|  | } | 
|  |  | 
|  | void RoundRobinPacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) { | 
|  | if (paused_ == paused) | 
|  | return; | 
|  | UpdateQueueTime(timestamp_ms); | 
|  | paused_ = paused; | 
|  | } | 
|  |  | 
|  | int64_t RoundRobinPacketQueue::AverageQueueTimeMs() const { | 
|  | if (Empty()) | 
|  | return 0; | 
|  | return queue_time_sum_ms_ / size_packets_; | 
|  | } | 
|  |  | 
|  | void RoundRobinPacketQueue::Push(QueuedPacket packet) { | 
|  | auto stream_info_it = streams_.find(packet.ssrc()); | 
|  | if (stream_info_it == streams_.end()) { | 
|  | stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first; | 
|  | stream_info_it->second.priority_it = stream_priorities_.end(); | 
|  | stream_info_it->second.ssrc = packet.ssrc(); | 
|  | } | 
|  |  | 
|  | Stream* stream = &stream_info_it->second; | 
|  |  | 
|  | if (stream->priority_it == stream_priorities_.end()) { | 
|  | // If the SSRC is not currently scheduled, add it to |stream_priorities_|. | 
|  | RTC_CHECK(!IsSsrcScheduled(stream->ssrc)); | 
|  | stream->priority_it = stream_priorities_.emplace( | 
|  | StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc()); | 
|  | } else if (packet.priority() < stream->priority_it->first.priority) { | 
|  | // If the priority of this SSRC increased, remove the outdated StreamPrioKey | 
|  | // and insert a new one with the new priority. Note that |priority_| uses | 
|  | // lower ordinal for higher priority. | 
|  | stream_priorities_.erase(stream->priority_it); | 
|  | stream->priority_it = stream_priorities_.emplace( | 
|  | StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc()); | 
|  | } | 
|  | RTC_CHECK(stream->priority_it != stream_priorities_.end()); | 
|  |  | 
|  | // In order to figure out how much time a packet has spent in the queue while | 
|  | // not in a paused state, we subtract the total amount of time the queue has | 
|  | // been paused so far, and when the packet is popped we subtract the total | 
|  | // amount of time the queue has been paused at that moment. This way we | 
|  | // subtract the total amount of time the packet has spent in the queue while | 
|  | // in a paused state. | 
|  | UpdateQueueTime(packet.enqueue_time_ms()); | 
|  | packet.SubtractPauseTimeMs(pause_time_sum_ms_); | 
|  |  | 
|  | size_packets_ += 1; | 
|  | size_bytes_ += packet.size_in_bytes(); | 
|  |  | 
|  | stream->packet_queue.push(packet); | 
|  | } | 
|  |  | 
|  | RoundRobinPacketQueue::Stream* | 
|  | RoundRobinPacketQueue::GetHighestPriorityStream() { | 
|  | RTC_CHECK(!stream_priorities_.empty()); | 
|  | uint32_t ssrc = stream_priorities_.begin()->second; | 
|  |  | 
|  | auto stream_info_it = streams_.find(ssrc); | 
|  | RTC_CHECK(stream_info_it != streams_.end()); | 
|  | RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin()); | 
|  | RTC_CHECK(!stream_info_it->second.packet_queue.empty()); | 
|  | return &stream_info_it->second; | 
|  | } | 
|  |  | 
|  | bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const { | 
|  | for (const auto& scheduled_stream : stream_priorities_) { | 
|  | if (scheduled_stream.second == ssrc) | 
|  | return true; | 
|  | } | 
|  | return false; | 
|  | } | 
|  |  | 
|  | }  // namespace webrtc |