Improve PacketArrivalTimeMap perfomance
replace std::deque implementation with a manually controlled circular buffer.
replace Timestamp validity check from 'IsInfinite()' accesser to cheaper comparison to zero.
These greatly increase PacketArrivalTimeMap::AddPacket perfomance when packet arrive with large sequence number gaps.
Bug: chromium:1349880
Change-Id: I6f4e814b1086ca9d0b48608531e3a387d9e542dc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/270564
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37722}
diff --git a/modules/remote_bitrate_estimator/packet_arrival_map.cc b/modules/remote_bitrate_estimator/packet_arrival_map.cc
index 09c9e5a..16d400e 100644
--- a/modules/remote_bitrate_estimator/packet_arrival_map.cc
+++ b/modules/remote_bitrate_estimator/packet_arrival_map.cc
@@ -10,118 +10,184 @@
#include "modules/remote_bitrate_estimator/packet_arrival_map.h"
#include <algorithm>
+#include <cstdint>
-#include "rtc_base/numerics/safe_minmax.h"
+#include "api/units/timestamp.h"
+#include "rtc_base/checks.h"
namespace webrtc {
-constexpr size_t PacketArrivalTimeMap::kMaxNumberOfPackets;
-
void PacketArrivalTimeMap::AddPacket(int64_t sequence_number,
Timestamp arrival_time) {
RTC_DCHECK_GE(arrival_time, Timestamp::Zero());
- if (!has_seen_packet_) {
+ if (!has_seen_packet()) {
// First packet.
- has_seen_packet_ = true;
+ Reallocate(kMinCapacity);
begin_sequence_number_ = sequence_number;
- arrival_times_.push_back(arrival_time);
+ end_sequence_number_ = sequence_number + 1;
+ arrival_times_[Index(sequence_number)] = arrival_time;
return;
}
- int64_t pos = sequence_number - begin_sequence_number_;
- if (pos >= 0 && pos < static_cast<int64_t>(arrival_times_.size())) {
+ if (sequence_number >= begin_sequence_number() &&
+ sequence_number < end_sequence_number()) {
// The packet is within the buffer - no need to expand it.
- arrival_times_[pos] = arrival_time;
+ arrival_times_[Index(sequence_number)] = arrival_time;
return;
}
- if (pos < 0) {
+ if (sequence_number < begin_sequence_number()) {
// The packet goes before the current buffer. Expand to add packet, but only
// if it fits within kMaxNumberOfPackets.
- size_t missing_packets = -pos;
- if (missing_packets + arrival_times_.size() > kMaxNumberOfPackets) {
+ int64_t new_size = end_sequence_number() - sequence_number;
+ if (new_size > kMaxNumberOfPackets) {
// Don't expand the buffer further, as that would remove newly received
// packets.
return;
}
+ AdjustToSize(new_size);
- arrival_times_.insert(arrival_times_.begin(), missing_packets,
- Timestamp::MinusInfinity());
- arrival_times_[0] = arrival_time;
+ arrival_times_[Index(sequence_number)] = arrival_time;
+ SetNotReceived(sequence_number + 1, begin_sequence_number_);
begin_sequence_number_ = sequence_number;
return;
}
// The packet goes after the buffer.
+ RTC_DCHECK_GE(sequence_number, end_sequence_number_);
+ int64_t new_end_sequence_number = sequence_number + 1;
- if (static_cast<size_t>(pos) >= kMaxNumberOfPackets) {
- // The buffer grows too large - old packets have to be removed.
- size_t packets_to_remove = pos - kMaxNumberOfPackets + 1;
- if (packets_to_remove >= arrival_times_.size()) {
- arrival_times_.clear();
- begin_sequence_number_ = sequence_number;
- pos = 0;
- } else {
- // Also trim the buffer to remove leading non-received packets, to
- // ensure that the buffer only spans received packets.
- while (packets_to_remove < arrival_times_.size() &&
- arrival_times_[packets_to_remove].IsInfinite()) {
- ++packets_to_remove;
- }
-
- arrival_times_.erase(arrival_times_.begin(),
- arrival_times_.begin() + packets_to_remove);
- begin_sequence_number_ += packets_to_remove;
- pos -= packets_to_remove;
- RTC_DCHECK_GE(pos, 0);
- }
+ if (new_end_sequence_number >= end_sequence_number_ + kMaxNumberOfPackets) {
+ // All old packets have to be removed.
+ begin_sequence_number_ = sequence_number;
+ end_sequence_number_ = new_end_sequence_number;
+ arrival_times_[Index(sequence_number)] = arrival_time;
+ return;
}
+ if (begin_sequence_number_ < new_end_sequence_number - kMaxNumberOfPackets) {
+ // Remove oldest entries
+ begin_sequence_number_ = new_end_sequence_number - kMaxNumberOfPackets;
+ RTC_DCHECK_GT(end_sequence_number_, begin_sequence_number_);
+ // Also trim the buffer to remove leading non-received packets, to
+ // ensure that the buffer only spans received packets.
+ TrimLeadingNotReceivedEntries();
+ }
+
+ AdjustToSize(new_end_sequence_number - begin_sequence_number_);
+
// Packets can be received out-of-order. If this isn't the next expected
// packet, add enough placeholders to fill the gap.
- size_t missing_gap_packets = pos - arrival_times_.size();
- if (missing_gap_packets > 0) {
- arrival_times_.insert(arrival_times_.end(), missing_gap_packets,
- Timestamp::MinusInfinity());
+ SetNotReceived(end_sequence_number_, sequence_number);
+ end_sequence_number_ = new_end_sequence_number;
+ arrival_times_[Index(sequence_number)] = arrival_time;
+}
+
+void PacketArrivalTimeMap::TrimLeadingNotReceivedEntries() {
+ const int begin_index = Index(begin_sequence_number_);
+ const Timestamp* const begin_it = arrival_times_.get() + begin_index;
+ const Timestamp* const end_it = arrival_times_.get() + capacity();
+
+ for (const Timestamp* it = begin_it; it != end_it; ++it) {
+ if (*it >= Timestamp::Zero()) {
+ begin_sequence_number_ += (it - begin_it);
+ return;
+ }
}
- RTC_DCHECK_EQ(arrival_times_.size(), pos);
- arrival_times_.push_back(arrival_time);
- RTC_DCHECK_LE(arrival_times_.size(), kMaxNumberOfPackets);
+ // Reached end of the arrival_times_ and all entries represent not received
+ // packets. Remove them.
+ begin_sequence_number_ += (capacity() - begin_index);
+ // Continue removing entries at the beginning of the circular buffer.
+ for (const Timestamp* it = arrival_times_.get(); it != begin_it; ++it) {
+ if (*it >= Timestamp::Zero()) {
+ begin_sequence_number_ += (it - arrival_times_.get());
+ return;
+ }
+ }
+
+ RTC_DCHECK_NOTREACHED() << "There should be at least one non-empty entry";
+}
+
+void PacketArrivalTimeMap::SetNotReceived(
+ int64_t begin_sequence_number_inclusive,
+ int64_t end_sequence_number_exclusive) {
+ static constexpr Timestamp value = Timestamp::MinusInfinity();
+
+ int begin_index = Index(begin_sequence_number_inclusive);
+ int end_index = Index(end_sequence_number_exclusive);
+
+ if (begin_index <= end_index) {
+ // Entries to clear are in single block:
+ // [......{-----}....]
+ std::fill(arrival_times_.get() + begin_index,
+ arrival_times_.get() + end_index, value);
+ } else {
+ // Entries to clear span across arrival_times_ border:
+ // [--}..........{---]
+ std::fill(arrival_times_.get() + begin_index,
+ arrival_times_.get() + capacity(), value);
+ std::fill(arrival_times_.get(), arrival_times_.get() + end_index, value);
+ }
}
void PacketArrivalTimeMap::RemoveOldPackets(int64_t sequence_number,
Timestamp arrival_time_limit) {
- while (!arrival_times_.empty() && begin_sequence_number_ < sequence_number &&
- arrival_times_.front() <= arrival_time_limit) {
- arrival_times_.pop_front();
+ int64_t check_to = std::min(sequence_number, end_sequence_number_);
+ while (begin_sequence_number_ < check_to &&
+ arrival_times_[Index(begin_sequence_number_)] <= arrival_time_limit) {
++begin_sequence_number_;
}
-}
-
-bool PacketArrivalTimeMap::has_received(int64_t sequence_number) const {
- int64_t pos = sequence_number - begin_sequence_number_;
- if (pos >= 0 && pos < static_cast<int64_t>(arrival_times_.size()) &&
- arrival_times_[pos].IsFinite()) {
- return true;
- }
- return false;
+ AdjustToSize(end_sequence_number_ - begin_sequence_number_);
}
void PacketArrivalTimeMap::EraseTo(int64_t sequence_number) {
- if (sequence_number > begin_sequence_number_) {
- size_t count =
- std::min(static_cast<size_t>(sequence_number - begin_sequence_number_),
- arrival_times_.size());
-
- arrival_times_.erase(arrival_times_.begin(),
- arrival_times_.begin() + count);
- begin_sequence_number_ += count;
+ if (sequence_number < begin_sequence_number_) {
+ return;
}
+ if (sequence_number >= end_sequence_number_) {
+ // Erase all.
+ begin_sequence_number_ = end_sequence_number_;
+ return;
+ }
+ // Remove some.
+ begin_sequence_number_ = sequence_number;
+ RTC_DCHECK(has_received(begin_sequence_number_));
+ AdjustToSize(end_sequence_number_ - begin_sequence_number_);
}
-int64_t PacketArrivalTimeMap::clamp(int64_t sequence_number) const {
- return rtc::SafeClamp(sequence_number, begin_sequence_number(),
- end_sequence_number());
+void PacketArrivalTimeMap::AdjustToSize(int new_size) {
+ if (new_size > capacity()) {
+ int new_capacity = capacity();
+ while (new_capacity < new_size)
+ new_capacity *= 2;
+ Reallocate(new_capacity);
+ }
+ if (capacity() > std::max(kMinCapacity, 4 * new_size)) {
+ int new_capacity = capacity();
+ while (new_capacity > 2 * std::max(new_size, kMinCapacity)) {
+ new_capacity /= 2;
+ }
+ Reallocate(new_capacity);
+ }
+ RTC_DCHECK_LE(new_size, capacity());
+}
+
+void PacketArrivalTimeMap::Reallocate(int new_capacity) {
+ int new_capacity_minus_1 = new_capacity - 1;
+ // Check capacity is a power of 2.
+ RTC_DCHECK_EQ(new_capacity & new_capacity_minus_1, 0);
+ // Create uninitialized memory.
+ // All valid entries should be set by `AddPacket` before use.
+ void* raw = operator new[](new_capacity * sizeof(Timestamp));
+ Timestamp* new_buffer = static_cast<Timestamp*>(raw);
+
+ for (int64_t sequence_number = begin_sequence_number_;
+ sequence_number < end_sequence_number_; ++sequence_number) {
+ new_buffer[sequence_number & new_capacity_minus_1] =
+ arrival_times_[sequence_number & capacity_minus_1_];
+ }
+ arrival_times_.reset(new_buffer);
+ capacity_minus_1_ = new_capacity_minus_1;
}
} // namespace webrtc
diff --git a/modules/remote_bitrate_estimator/packet_arrival_map.h b/modules/remote_bitrate_estimator/packet_arrival_map.h
index 8bda4a8..d489a0c 100644
--- a/modules/remote_bitrate_estimator/packet_arrival_map.h
+++ b/modules/remote_bitrate_estimator/packet_arrival_map.h
@@ -10,9 +10,10 @@
#ifndef MODULES_REMOTE_BITRATE_ESTIMATOR_PACKET_ARRIVAL_MAP_H_
#define MODULES_REMOTE_BITRATE_ESTIMATOR_PACKET_ARRIVAL_MAP_H_
+#include <algorithm>
#include <cstddef>
#include <cstdint>
-#include <deque>
+#include <memory>
#include "api/units/timestamp.h"
#include "rtc_base/checks.h"
@@ -32,10 +33,19 @@
public:
// Impossible to request feedback older than what can be represented by 15
// bits.
- static constexpr size_t kMaxNumberOfPackets = (1 << 15);
+ static constexpr int kMaxNumberOfPackets = (1 << 15);
+
+ PacketArrivalTimeMap() = default;
+ PacketArrivalTimeMap(const PacketArrivalTimeMap&) = delete;
+ PacketArrivalTimeMap& operator=(const PacketArrivalTimeMap&) = delete;
+ ~PacketArrivalTimeMap() = default;
// Indicates if the packet with `sequence_number` has already been received.
- bool has_received(int64_t sequence_number) const;
+ bool has_received(int64_t sequence_number) const {
+ return sequence_number >= begin_sequence_number() &&
+ sequence_number < end_sequence_number() &&
+ arrival_times_[Index(sequence_number)] >= Timestamp::Zero();
+ }
// Returns the sequence number of the first entry in the map, i.e. the
// sequence number that a `begin()` iterator would represent.
@@ -43,21 +53,22 @@
// Returns the sequence number of the element just after the map, i.e. the
// sequence number that an `end()` iterator would represent.
- int64_t end_sequence_number() const {
- return begin_sequence_number_ + arrival_times_.size();
- }
+ int64_t end_sequence_number() const { return end_sequence_number_; }
// Returns an element by `sequence_number`, which must be valid, i.e.
// between [begin_sequence_number, end_sequence_number).
Timestamp get(int64_t sequence_number) {
- int64_t pos = sequence_number - begin_sequence_number_;
- RTC_DCHECK(pos >= 0 && pos < static_cast<int64_t>(arrival_times_.size()));
- return arrival_times_[pos];
+ RTC_DCHECK_GE(sequence_number, begin_sequence_number());
+ RTC_DCHECK_LT(sequence_number, end_sequence_number());
+ return arrival_times_[Index(sequence_number)];
}
// Clamps `sequence_number` between [begin_sequence_number,
// end_sequence_number].
- int64_t clamp(int64_t sequence_number) const;
+ int64_t clamp(int64_t sequence_number) const {
+ return std::clamp(sequence_number, begin_sequence_number(),
+ end_sequence_number());
+ }
// Erases all elements from the beginning of the map until `sequence_number`.
void EraseTo(int64_t sequence_number);
@@ -71,17 +82,44 @@
void RemoveOldPackets(int64_t sequence_number, Timestamp arrival_time_limit);
private:
- // Deque representing unwrapped sequence number -> time, where the index +
- // `begin_sequence_number_` represents the packet's sequence number.
- std::deque<Timestamp> arrival_times_;
+ static constexpr int kMinCapacity = 128;
- // The unwrapped sequence number for the first element in
- // `arrival_times_`.
+ // Returns index in the `arrival_times_` for value for `sequence_number`.
+ int Index(int64_t sequence_number) const {
+ // Note that sequence_number might be negative, thus taking '%' requires
+ // extra handling and can be slow. Because capacity is a power of two, it
+ // is much faster to use '&' operator.
+ return sequence_number & capacity_minus_1_;
+ }
+
+ void SetNotReceived(int64_t begin_sequence_number_inclusive,
+ int64_t end_sequence_number_exclusive);
+
+ void TrimLeadingNotReceivedEntries();
+
+ // Adjust capacity to match new_size, may reduce capacity.
+ // On return guarantees capacity >= new_size.
+ void AdjustToSize(int new_size);
+ void Reallocate(int new_capacity);
+
+ int capacity() const { return capacity_minus_1_ + 1; }
+ bool has_seen_packet() const { return arrival_times_ != nullptr; }
+
+ // Circular buffer. Packet with sequence number `sequence_number`
+ // is stored in the slot `sequence_number % capacity_`
+ std::unique_ptr<Timestamp[]> arrival_times_ = nullptr;
+
+ // Allocated size of the `arrival_times_`
+ // capacity_ is a power of 2 in range [kMinCapacity, kMaxNumberOfPackets]
+ // `capacity - 1` is used much more often than `capacity`, thus that value is
+ // stored.
+ int capacity_minus_1_ = -1;
+
+ // The unwrapped sequence number for valid range of sequence numbers.
+ // arrival_times_ entries only valid for sequence numbers in range
+ // `begin_sequence_number_ <= sequence_number < end_sequence_number_`
int64_t begin_sequence_number_ = 0;
-
- // Indicates if this map has had any packet added to it. The first packet
- // decides the initial sequence number.
- bool has_seen_packet_ = false;
+ int64_t end_sequence_number_ = 0;
};
} // namespace webrtc
diff --git a/modules/remote_bitrate_estimator/packet_arrival_map_test.cc b/modules/remote_bitrate_estimator/packet_arrival_map_test.cc
index c083daa..00c927f 100644
--- a/modules/remote_bitrate_estimator/packet_arrival_map_test.cc
+++ b/modules/remote_bitrate_estimator/packet_arrival_map_test.cc
@@ -102,8 +102,7 @@
EXPECT_EQ(map.begin_sequence_number(), 43);
EXPECT_EQ(map.end_sequence_number(), kLargeSeq + 1);
- EXPECT_EQ(static_cast<size_t>(map.end_sequence_number() -
- map.begin_sequence_number()),
+ EXPECT_EQ(map.end_sequence_number() - map.begin_sequence_number(),
PacketArrivalTimeMap::kMaxNumberOfPackets);
EXPECT_FALSE(map.has_received(41));
diff --git a/modules/remote_bitrate_estimator/remote_estimator_proxy.cc b/modules/remote_bitrate_estimator/remote_estimator_proxy.cc
index 1bb2f55..b83720d 100644
--- a/modules/remote_bitrate_estimator/remote_estimator_proxy.cc
+++ b/modules/remote_bitrate_estimator/remote_estimator_proxy.cc
@@ -82,7 +82,7 @@
void RemoteEstimatorProxy::IncomingPacket(int64_t arrival_time_ms,
size_t payload_size,
const RTPHeader& header) {
- if (arrival_time_ms < 0 || arrival_time_ms > kMaxTimeMs) {
+ if (arrival_time_ms < 0 || arrival_time_ms >= kMaxTimeMs) {
RTC_LOG(LS_WARNING) << "Arrival time out of bounds: " << arrival_time_ms;
return;
}
@@ -292,7 +292,7 @@
for (int64_t seq = start_seq; seq < end_seq; ++seq) {
Timestamp arrival_time = packet_arrival_times_.get(seq);
- if (arrival_time.IsInfinite()) {
+ if (arrival_time < Timestamp::Zero()) {
// Packet not received.
continue;
}