blob: 836ae1f60ae892fed06a12f5288bcb781031e07a [file] [log] [blame]
/*
* Copyright 2025 The WebRTC Project Authors. All rights reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "test/network/token_bucket_network_behavior.h"
#include <algorithm>
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include <vector>
#include "api/function_view.h"
#include "api/sequence_checker.h"
#include "api/test/network_emulation/network_queue.h"
#include "api/test/network_emulation/token_bucket_network_behavior_config.h"
#include "api/test/simulated_network.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "rtc_base/checks.h"
#include "rtc_base/synchronization/mutex.h"
namespace webrtc {
TokenBucketNetworkBehavior::TokenBucketNetworkBehavior(
const TokenBucketNetworkBehaviorConfig& config)
: TokenBucketNetworkBehavior(config, nullptr) {}
TokenBucketNetworkBehavior::TokenBucketNetworkBehavior(
const TokenBucketNetworkBehaviorConfig& config,
std::unique_ptr<NetworkQueue> queue)
: config_(config), queue_(std::move(queue)), token_bucket_(config.burst) {
sequence_checker_.Detach();
}
bool TokenBucketNetworkBehavior::EnqueuePacket(PacketInFlightInfo packet_info) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
bool result = false;
Timestamp time_now = packet_info.send_time();
RefillTokensSinceLastProcess(time_now);
if (queue_ != nullptr) {
result = queue_->EnqueuePacket(packet_info);
if (next_delivery_time_.IsInfinite()) {
next_delivery_time_ =
CalculateNextDequeueTime(time_now, queue_->PeekNextPacket());
}
} else {
// no queue.
Timestamp next_delivery_time =
CalculateNextDequeueTime(time_now, packet_info);
if (next_delivery_time == time_now) {
// There is enough tokens to deliver the packet immediately.
PrepareToDeliverPacket(time_now, packet_info);
return true;
}
}
return result;
}
void TokenBucketNetworkBehavior::PrepareToDeliverPacket(
Timestamp time_now,
const PacketInFlightInfo& packet_to_deliver) {
token_bucket_ -= packet_to_deliver.packet_size();
PacketDeliveryInfo packet(packet_to_deliver, time_now.us());
deliverable_packets_.push_back(packet);
next_delivery_time_ = time_now;
}
void TokenBucketNetworkBehavior::RefillTokensSinceLastProcess(
Timestamp time_now) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
if (!last_process_time_) {
last_process_time_ = time_now;
}
// Refill token bucket.
TimeDelta time_delta = time_now - *last_process_time_;
webrtc::MutexLock lock(&config_lock_);
if (time_delta > TimeDelta::Zero()) {
token_bucket_ += config_.rate * time_delta;
token_bucket_ = std::min(token_bucket_, config_.burst);
}
last_process_time_ = time_now;
}
Timestamp TokenBucketNetworkBehavior::CalculateNextDequeueTime(
Timestamp time_now,
std::optional<PacketInFlightInfo> packet_info) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
if (!packet_info.has_value()) {
return Timestamp::PlusInfinity();
}
if (packet_info->packet_size() <= token_bucket_) {
return time_now;
}
MutexLock lock(&config_lock_);
if (config_.rate == DataRate::Zero()) {
return Timestamp::PlusInfinity();
}
TimeDelta time_to_enough_tokens =
(packet_info->packet_size() - token_bucket_) / config_.rate;
return time_now + time_to_enough_tokens;
}
std::vector<PacketDeliveryInfo>
TokenBucketNetworkBehavior::DequeueDeliverablePackets(int64_t time_now_us) {
Timestamp time_now = Timestamp::Micros(time_now_us);
RTC_DCHECK_RUN_ON(&sequence_checker_);
RefillTokensSinceLastProcess(time_now);
next_delivery_time_ = Timestamp::PlusInfinity();
if (queue_ != nullptr) {
while (CalculateNextDequeueTime(time_now, queue_->PeekNextPacket()) <=
time_now) {
std::optional<PacketInFlightInfo> packet =
queue_->DequeuePacket(time_now);
RTC_CHECK(packet.has_value());
PrepareToDeliverPacket(time_now, *packet);
}
for (const auto& packet_in_flight_info : queue_->DequeueDroppedPackets()) {
PacketDeliveryInfo packet(packet_in_flight_info,
PacketDeliveryInfo::kNotReceived);
deliverable_packets_.push_back(packet);
}
next_delivery_time_ =
CalculateNextDequeueTime(time_now, queue_->PeekNextPacket());
}
std::vector<PacketDeliveryInfo> delivered_packets;
delivered_packets.swap(deliverable_packets_);
return delivered_packets;
}
std::optional<int64_t> TokenBucketNetworkBehavior::NextDeliveryTimeUs() const {
RTC_DCHECK_RUN_ON(&sequence_checker_);
return next_delivery_time_.IsFinite()
? std::make_optional<int64_t>(next_delivery_time_.us())
: std::nullopt;
}
void TokenBucketNetworkBehavior::UpdateConfig(
webrtc::FunctionView<void(TokenBucketNetworkBehaviorConfig&)> configurer) {
MutexLock lock(&config_lock_);
configurer(config_);
}
} // namespace webrtc