Revert "Introduce NetworkQueue interface"
This reverts commit 1cc000835fb5531179c3b0fc7007fd1245bac49d.
Reason for revert: Breaks downstream test (threading issue)
Bug: webrtc:42225697
Original change's description:
> Introduce NetworkQueue interface
>
> The purpose of the interface is to allow network simulations to implement their own queueing.
>
> The existing SimulatedNetwork is refactored to use a NetworkQueue interface.
> Per default a simple LeayBucket is used that has the same behavior as SimulatedNetwork todayIntroduce NetworkQueue interface
>
> Bug: webrtc:42225697
> Change-Id: I9797a716c8a4599eb9e86e97fc6ebb267fee30f6
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/398281
> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
> Commit-Queue: Per Kjellander <perkj@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#45071}
Bug: webrtc:42225697
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Change-Id: Ie5293d6cb489c525f89173bd39ed8221af496d48
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/398941
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Auto-Submit: Per Kjellander <perkj@webrtc.org>
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#45073}
diff --git a/BUILD.gn b/BUILD.gn
index 1565aeb..fd77e77 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -635,7 +635,6 @@
"api/numerics:numerics_unittests",
"api/task_queue:pending_task_safety_flag_unittests",
"api/test/metrics:metrics_unittests",
- "api/test/network_emulation:network_queue_unittests",
"api/transport:stun_unittest",
"api/transport/rtp:corruption_detection_message_unittest",
"api/video/test:rtc_api_video_unittests",
diff --git a/api/BUILD.gn b/api/BUILD.gn
index 2e5f1a0..0b996da 100644
--- a/api/BUILD.gn
+++ b/api/BUILD.gn
@@ -888,8 +888,6 @@
"../rtc_base:random",
"transport:ecn_marking",
"units:data_rate",
- "units:data_size",
- "units:timestamp",
"//third_party/abseil-cpp/absl/functional:any_invocable",
]
}
@@ -915,7 +913,6 @@
"../rtc_base:socket_address",
"../test/network:simulated_network",
"test/network_emulation",
- "test/network_emulation:network_queue",
"units:data_rate",
"//third_party/abseil-cpp/absl/base:nullability",
"//third_party/abseil-cpp/absl/strings:string_view",
diff --git a/api/test/network_emulation/BUILD.gn b/api/test/network_emulation/BUILD.gn
index de58887..d22cc09 100644
--- a/api/test/network_emulation/BUILD.gn
+++ b/api/test/network_emulation/BUILD.gn
@@ -76,34 +76,3 @@
"../../../test/network:emulated_network",
]
}
-
-rtc_library("network_queue") {
- visibility = [ "*" ]
-
- sources = [
- "leaky_bucket_network_queue.cc",
- "leaky_bucket_network_queue.h",
- "network_queue.h",
- ]
-
- deps = [
- "../..:sequence_checker",
- "../..:simulated_network_api",
- "../../../rtc_base:checks",
- "../../../rtc_base:macromagic",
- "../../units:timestamp",
- ]
-}
-
-rtc_library("network_queue_unittests") {
- sources = [ "leaky_bucket_network_queue_unittest.cc" ]
-
- testonly = true
- deps = [
- ":network_queue",
- "../..:simulated_network_api",
- "../../../test:test_support",
- "../../units:data_size",
- "../../units:timestamp",
- ]
-}
diff --git a/api/test/network_emulation/leaky_bucket_network_queue.cc b/api/test/network_emulation/leaky_bucket_network_queue.cc
deleted file mode 100644
index 43170e0..0000000
--- a/api/test/network_emulation/leaky_bucket_network_queue.cc
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2025 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "api/test/network_emulation/leaky_bucket_network_queue.h"
-
-#include <algorithm>
-#include <cstddef>
-#include <optional>
-#include <vector>
-
-#include "api/sequence_checker.h"
-#include "api/test/simulated_network.h"
-#include "api/units/timestamp.h"
-#include "rtc_base/checks.h"
-
-namespace webrtc {
-
-LeakyBucketNetworkQueue::LeakyBucketNetworkQueue(size_t max_packet_capacity)
- : max_packet_capacity_(
- std::min(max_packet_capacity,
- LeakyBucketNetworkQueue::kMaxPacketCapacity)) {
- sequence_checker_.Detach();
-}
-
-bool LeakyBucketNetworkQueue::EnqueuePacket(
- const PacketInFlightInfo& packet_info) {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
- if (max_packet_capacity_ == queue_.size()) {
- return false;
- }
- queue_.push(packet_info);
- return true;
-}
-
-std::optional<PacketInFlightInfo> LeakyBucketNetworkQueue::PeekNextPacket()
- const {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
- if (queue_.empty()) {
- return std::nullopt;
- }
- return queue_.front();
-}
-
-std::optional<PacketInFlightInfo> LeakyBucketNetworkQueue::DequeuePacket(
- Timestamp time_now) {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
- if (queue_.empty()) {
- return std::nullopt;
- }
- RTC_DCHECK_LE(queue_.front().send_time(), time_now);
- PacketInFlightInfo packet_info = queue_.front();
- queue_.pop();
- return packet_info;
-}
-
-bool LeakyBucketNetworkQueue::empty() const {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
- return queue_.empty();
-}
-
-void LeakyBucketNetworkQueue::DropOldestPacket() {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
- dropped_packets_.push_back(queue_.front());
- queue_.pop();
-}
-
-std::vector<PacketInFlightInfo>
-LeakyBucketNetworkQueue::DequeueDroppedPackets() {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
- std::vector<PacketInFlightInfo> dropped_packets;
- dropped_packets.swap(dropped_packets_);
- return dropped_packets;
-}
-
-} // namespace webrtc
diff --git a/api/test/network_emulation/leaky_bucket_network_queue.h b/api/test/network_emulation/leaky_bucket_network_queue.h
deleted file mode 100644
index 3b13b1f..0000000
--- a/api/test/network_emulation/leaky_bucket_network_queue.h
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Copyright 2025 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef API_TEST_NETWORK_EMULATION_LEAKY_BUCKET_NETWORK_QUEUE_H_
-#define API_TEST_NETWORK_EMULATION_LEAKY_BUCKET_NETWORK_QUEUE_H_
-
-#include <cstddef>
-#include <memory>
-#include <optional>
-#include <queue>
-#include <vector>
-
-#include "api/sequence_checker.h"
-#include "api/test/network_emulation/network_queue.h"
-#include "api/test/simulated_network.h"
-#include "api/units/timestamp.h"
-
-namespace webrtc {
-
-// A network queue that uses a leaky bucket to limit the number of packets that
-// can be queued.
-class LeakyBucketNetworkQueue : public NetworkQueue {
- public:
- constexpr static size_t kMaxPacketCapacity = 100000;
- LeakyBucketNetworkQueue()
- : webrtc::LeakyBucketNetworkQueue(kMaxPacketCapacity) {}
- explicit LeakyBucketNetworkQueue(size_t max_packet_capacity);
-
- bool EnqueuePacket(const PacketInFlightInfo& packet_info) override;
- std::optional<PacketInFlightInfo> PeekNextPacket() const override;
- std::optional<PacketInFlightInfo> DequeuePacket(Timestamp time_now) override;
- std::vector<PacketInFlightInfo> DequeueDroppedPackets() override;
- bool empty() const override;
-
- void DropOldestPacket();
-
- private:
- SequenceChecker sequence_checker_;
- const size_t max_packet_capacity_;
- std::queue<PacketInFlightInfo> queue_;
- std::vector<PacketInFlightInfo> dropped_packets_;
-};
-
-class LeakyBucketNetworkQueueFactory : public NetworkQueueFactory {
- public:
- explicit LeakyBucketNetworkQueueFactory(size_t max_packet_capacity)
- : max_packet_capacity_(max_packet_capacity) {}
- std::unique_ptr<NetworkQueue> CreateQueue() override {
- return std::make_unique<LeakyBucketNetworkQueue>(max_packet_capacity_);
- }
-
- private:
- const size_t max_packet_capacity_;
-};
-} // namespace webrtc
-
-#endif // API_TEST_NETWORK_EMULATION_LEAKY_BUCKET_NETWORK_QUEUE_H_
diff --git a/api/test/network_emulation/leaky_bucket_network_queue_unittest.cc b/api/test/network_emulation/leaky_bucket_network_queue_unittest.cc
deleted file mode 100644
index 8f896a3..0000000
--- a/api/test/network_emulation/leaky_bucket_network_queue_unittest.cc
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright 2025 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "api/test/network_emulation/leaky_bucket_network_queue.h"
-
-#include <optional>
-
-#include "api/test/simulated_network.h"
-#include "api/units/data_size.h"
-#include "api/units/timestamp.h"
-#include "test/gmock.h"
-#include "test/gtest.h"
-
-namespace webrtc {
-namespace {
-
-using ::testing::Field;
-using ::testing::Optional;
-using ::testing::Property;
-
-TEST(LeakyBucketNetworkQueueTest, EnqueuePacketReturnsFalseIfQueueIsFull) {
- LeakyBucketNetworkQueue queue(/*max_packet_capacity=*/1);
- PacketInFlightInfo packet_info(DataSize::Bytes(123), Timestamp::Zero(),
- /*packet_id=*/1);
- EXPECT_TRUE(queue.EnqueuePacket(packet_info));
- EXPECT_FALSE(queue.EnqueuePacket(packet_info));
-}
-
-TEST(LeakyBucketNetworkQueueTest, ReturnsNullOptWhenEmtpy) {
- LeakyBucketNetworkQueue queue(/*max_packet_capacity=*/1);
- EXPECT_TRUE(queue.empty());
- EXPECT_EQ(queue.DequeuePacket(Timestamp::Zero()), std::nullopt);
- EXPECT_EQ(queue.PeekNextPacket(), std::nullopt);
-}
-
-TEST(LeakyBucketNetworkQueueTest, DequeueDoesNotChangePacketInfo) {
- LeakyBucketNetworkQueue queue(/*max_packet_capacity=*/1);
- EXPECT_TRUE(queue.empty());
- PacketInFlightInfo packet_info(DataSize::Bytes(123), Timestamp::Seconds(123),
- /*packet_id=*/1);
- queue.EnqueuePacket(packet_info);
-
- EXPECT_THAT(
- queue.DequeuePacket(Timestamp::Seconds(125)),
- Optional(AllOf(
- Field(&PacketInFlightInfo::packet_id, packet_info.packet_id),
- Property(&PacketInFlightInfo::packet_size, packet_info.packet_size()),
- Property(&PacketInFlightInfo::send_time, packet_info.send_time()))));
-}
-
-} // namespace
-} // namespace webrtc
diff --git a/api/test/network_emulation/network_queue.h b/api/test/network_emulation/network_queue.h
deleted file mode 100644
index 31c3cdd..0000000
--- a/api/test/network_emulation/network_queue.h
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Copyright 2025 The WebRTC Project Authors. All rights reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef API_TEST_NETWORK_EMULATION_NETWORK_QUEUE_H_
-#define API_TEST_NETWORK_EMULATION_NETWORK_QUEUE_H_
-
-#include <memory>
-#include <optional>
-#include <vector>
-
-#include "api/test/simulated_network.h"
-#include "api/units/timestamp.h"
-
-namespace webrtc {
-
-// NetworkQueue defines the interface for a queue used in network simulation.
-// The purpose is to allow for different AQM implementations.
-// A queue should not modify PacketInFlightInfo except for the explicit
-// congestion notification field (ecn).
-class NetworkQueue {
- public:
- virtual ~NetworkQueue() = default;
- // Enqueues a packet.
- // Must return true if the packet is enqueued successfully, false otherwise.
- virtual bool EnqueuePacket(const PacketInFlightInfo& packet_info) = 0;
- // Next packet that can be dequeued.
- virtual std::optional<PacketInFlightInfo> PeekNextPacket() const = 0;
- // Dequeues a packet.
- // or std::nullopt if there are no enqueued packets.
- virtual std::optional<PacketInFlightInfo> DequeuePacket(
- Timestamp time_now) = 0;
-
- // Dequeues all packets that are dropped by the queue itself after being
- // enqueued.
- virtual std::vector<PacketInFlightInfo> DequeueDroppedPackets() = 0;
- virtual bool empty() const = 0;
-};
-
-class NetworkQueueFactory {
- public:
- virtual ~NetworkQueueFactory() = default;
- virtual std::unique_ptr<NetworkQueue> CreateQueue() = 0;
-};
-
-} // namespace webrtc
-#endif // API_TEST_NETWORK_EMULATION_NETWORK_QUEUE_H_
diff --git a/api/test/network_emulation_manager.cc b/api/test/network_emulation_manager.cc
index 41ec9ab..03dcd84 100644
--- a/api/test/network_emulation_manager.cc
+++ b/api/test/network_emulation_manager.cc
@@ -9,15 +9,12 @@
*/
#include "api/test/network_emulation_manager.h"
-#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include "absl/strings/string_view.h"
-#include "api/test/network_emulation/leaky_bucket_network_queue.h"
-#include "api/test/network_emulation/network_queue.h"
#include "api/test/simulated_network.h"
#include "api/units/data_rate.h"
#include "rtc_base/checks.h"
@@ -58,13 +55,6 @@
}
NetworkEmulationManager::SimulatedNetworkNode::Builder&
-NetworkEmulationManager::SimulatedNetworkNode::Builder::queue_factory(
- NetworkQueueFactory& queue_factory) {
- queue_factory_ = &queue_factory;
- return *this;
-}
-
-NetworkEmulationManager::SimulatedNetworkNode::Builder&
NetworkEmulationManager::SimulatedNetworkNode::Builder::delay_ms(
int queue_delay_ms) {
config_.queue_delay_ms = queue_delay_ms;
@@ -153,21 +143,8 @@
uint64_t random_seed) const {
RTC_CHECK(net);
RTC_CHECK(net_ == nullptr || net_ == net);
- std::unique_ptr<NetworkQueue> network_queue;
- if (queue_factory_ != nullptr) {
- network_queue = queue_factory_->CreateQueue();
- } else {
- size_t max_packet_capacity =
- /*max_packet_capacity=*/config_.queue_length_packets > 0
- ? config_.queue_length_packets - 1 // -1 to account for the
- // packet in the capacity link.
- : LeakyBucketNetworkQueue::kMaxPacketCapacity;
- network_queue =
- std::make_unique<LeakyBucketNetworkQueue>(max_packet_capacity);
- }
SimulatedNetworkNode res;
- auto behavior = std::make_unique<SimulatedNetwork>(config_, random_seed,
- std::move(network_queue));
+ auto behavior = std::make_unique<SimulatedNetwork>(config_, random_seed);
res.simulation = behavior.get();
res.node = net->CreateEmulatedNode(std::move(behavior));
return res;
diff --git a/api/test/network_emulation_manager.h b/api/test/network_emulation_manager.h
index c275881..199a60e 100644
--- a/api/test/network_emulation_manager.h
+++ b/api/test/network_emulation_manager.h
@@ -25,7 +25,6 @@
#include "api/field_trials_view.h"
#include "api/test/network_emulation/cross_traffic.h"
#include "api/test/network_emulation/network_emulation_interfaces.h"
-#include "api/test/network_emulation/network_queue.h"
#include "api/test/peer_network_dependencies.h"
#include "api/test/simulated_network.h"
#include "api/test/time_controller.h"
@@ -191,8 +190,6 @@
// Sets the config state, note that this will replace any previously set
// values.
Builder& config(BuiltInNetworkBehaviorConfig config);
- // If set, `queue_factory` must outlive the Builder.
- Builder& queue_factory(NetworkQueueFactory& queue_factory);
Builder& delay_ms(int queue_delay_ms);
Builder& capacity(DataRate link_capacity);
Builder& capacity_kbps(int link_capacity_kbps);
@@ -210,7 +207,6 @@
private:
NetworkEmulationManager* const net_;
BuiltInNetworkBehaviorConfig config_;
- NetworkQueueFactory* queue_factory_ = nullptr;
};
};
virtual ~NetworkEmulationManager() = default;
diff --git a/api/test/simulated_network.h b/api/test/simulated_network.h
index d1a823a..174fc0b 100644
--- a/api/test/simulated_network.h
+++ b/api/test/simulated_network.h
@@ -21,8 +21,6 @@
#include "absl/functional/any_invocable.h"
#include "api/transport/ecn_marking.h"
#include "api/units/data_rate.h"
-#include "api/units/data_size.h"
-#include "api/units/timestamp.h"
namespace webrtc {
@@ -35,24 +33,11 @@
send_time_us(send_time_us),
packet_id(packet_id),
ecn(ecn) {}
- PacketInFlightInfo(DataSize size,
- Timestamp send_time,
- uint64_t packet_id,
- EcnMarking ecn)
- : PacketInFlightInfo(size.bytes(), send_time.us(), packet_id, ecn) {}
- PacketInFlightInfo(DataSize size, Timestamp send_time, uint64_t packet_id)
- : PacketInFlightInfo(size.bytes(),
- send_time.us(),
- packet_id,
- EcnMarking::kNotEct) {}
PacketInFlightInfo(size_t size, int64_t send_time_us, uint64_t packet_id)
: PacketInFlightInfo(size, send_time_us, packet_id, EcnMarking::kNotEct) {
}
- DataSize packet_size() const { return DataSize::Bytes(size); }
- Timestamp send_time() const { return Timestamp::Micros(send_time_us); }
-
size_t size;
int64_t send_time_us;
// Unique identifier for the packet in relation to other packets in flight.
diff --git a/test/network/BUILD.gn b/test/network/BUILD.gn
index 0e1e46b..870b567 100644
--- a/test/network/BUILD.gn
+++ b/test/network/BUILD.gn
@@ -261,7 +261,6 @@
deps = [
"../../api:sequence_checker",
"../../api:simulated_network_api",
- "../../api/test/network_emulation:network_queue",
"../../api/units:data_rate",
"../../api/units:data_size",
"../../api/units:time_delta",
@@ -284,9 +283,7 @@
":simulated_network",
"..:test_support",
"../../api:simulated_network_api",
- "../../api/test/network_emulation:network_queue",
"../../api/units:data_rate",
- "../../api/units:data_size",
"../../api/units:time_delta",
"../../api/units:timestamp",
]
diff --git a/test/network/simulated_network.cc b/test/network/simulated_network.cc
index 7d67efc..da0751f 100644
--- a/test/network/simulated_network.cc
+++ b/test/network/simulated_network.cc
@@ -14,14 +14,11 @@
#include <cmath>
#include <cstdint>
#include <functional>
-#include <memory>
#include <optional>
#include <utility>
#include <vector>
#include "absl/functional/any_invocable.h"
-#include "api/test/network_emulation/leaky_bucket_network_queue.h"
-#include "api/test/network_emulation/network_queue.h"
#include "api/test/simulated_network.h"
#include "api/units/data_rate.h"
#include "api/units/data_size.h"
@@ -59,24 +56,11 @@
} // namespace
-SimulatedNetwork::SimulatedNetwork(Config config,
- uint64_t random_seed,
- std::unique_ptr<NetworkQueue> queue)
- : queue_(std::move(queue)), random_(random_seed) {
+SimulatedNetwork::SimulatedNetwork(Config config, uint64_t random_seed)
+ : random_(random_seed), bursting_(false), last_enqueue_time_us_(0) {
SetConfig(config);
}
-SimulatedNetwork::SimulatedNetwork(Config config, uint64_t random_seed)
- : SimulatedNetwork(
- config,
- random_seed,
- std::make_unique<LeakyBucketNetworkQueue>(
- /*max_packet_capacity=*/config.queue_length_packets > 0
- ? config.queue_length_packets -
- 1 // -1 to account for the
- // packet in the capacity link.
- : LeakyBucketNetworkQueue::kMaxPacketCapacity)) {}
-
SimulatedNetwork::~SimulatedNetwork() = default;
void SimulatedNetwork::SetConfig(const Config& config) {
@@ -109,21 +93,21 @@
Timestamp config_update_time) {
RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
- if (capacity_link_.has_value()) {
+ if (!capacity_link_.empty()) {
// Calculate and update how large portion of the packet first in the
// capacity link is left to to send at time `config_update_time`.
const BuiltInNetworkBehaviorConfig& current_config =
GetConfigState().config;
TimeDelta duration_with_current_config =
- config_update_time - capacity_link_->last_update_time;
+ config_update_time - capacity_link_.front().last_update_time;
RTC_DCHECK_GE(duration_with_current_config, TimeDelta::Zero());
- capacity_link_->bits_left_to_send -= std::min(
+ capacity_link_.front().bits_left_to_send -= std::min(
duration_with_current_config.ms() * current_config.link_capacity.kbps(),
- capacity_link_->bits_left_to_send);
- capacity_link_->last_update_time = config_update_time;
+ capacity_link_.front().bits_left_to_send);
+ capacity_link_.front().last_update_time = config_update_time;
}
SetConfig(new_config);
- UpdateCapacityLink(GetConfigState(), config_update_time);
+ UpdateCapacityQueue(GetConfigState(), config_update_time);
if (UpdateNextProcessTime() && next_process_time_changed_callback_) {
next_process_time_changed_callback_();
}
@@ -142,6 +126,7 @@
bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) {
RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
+
// Check that old packets don't get enqueued, the SimulatedNetwork expect that
// the packets' send time is monotonically increasing. The tolerance for
// non-monotonic enqueue events is 0.5 ms because on multi core systems
@@ -152,7 +137,6 @@
// At the moment, we see more than 130ms between non-monotonic events, which
// is more than expected.
// RTC_DCHECK_GE(packet.send_time_us - last_enqueue_time_us_, -2000);
- last_enqueue_time_us_ = packet.send_time_us;
ConfigState state = GetConfigState();
@@ -160,26 +144,28 @@
// possible.
packet.size += state.config.packet_overhead;
- Timestamp enqueue_time = packet.send_time();
- bool packet_enqueued = queue_->EnqueuePacket(packet);
- // A packet can not enter the narrow section before the last packet has exit.
- if (capacity_link_.has_value()) {
- // A packet is already in the capacity link. Wait until it exits.
- return packet_enqueued;
+ // If `queue_length_packets` is 0, the queue size is infinite.
+ if (state.config.queue_length_packets > 0 &&
+ capacity_link_.size() >= state.config.queue_length_packets) {
+ // Too many packet on the link, drop this one.
+ return false;
}
- PacketInFlightInfo next_packet = packet;
- if (!queue_->empty()) {
- next_packet = *queue_->DequeuePacket(enqueue_time);
- }
- Timestamp arrival_time = CalculateArrivalTime(
- std::max(next_packet.send_time(), last_capacity_link_exit_time_),
- packet.size * 8, state.config.link_capacity);
- capacity_link_ = {
- .packet = next_packet,
- .last_update_time = enqueue_time,
- .bits_left_to_send = 8 * static_cast<int64_t>(next_packet.size),
- .arrival_time = arrival_time};
+ // Note that arrival time will be updated when previous packets are dequeued
+ // from the capacity link.
+ // A packet can not enter the narrow section before the last packet has exit.
+ Timestamp enqueue_time = Timestamp::Micros(packet.send_time_us);
+ Timestamp arrival_time =
+ capacity_link_.empty()
+ ? CalculateArrivalTime(
+ std::max(enqueue_time, last_capacity_link_exit_time_),
+ packet.size * 8, state.config.link_capacity)
+ : Timestamp::PlusInfinity();
+ capacity_link_.push(
+ {.packet = packet,
+ .last_update_time = enqueue_time,
+ .bits_left_to_send = 8 * static_cast<int64_t>(packet.size),
+ .arrival_time = arrival_time});
// Only update `next_process_time_` if not already set. Otherwise,
// next_process_time_ is calculated when a packet is dequeued. Note that this
@@ -188,8 +174,11 @@
// config.delay_standard_deviation_ms is set.
// TODO(bugs.webrtc.org/14525): Consider preventing this.
if (next_process_time_.IsInfinite() && arrival_time.IsFinite()) {
+ RTC_DCHECK_EQ(capacity_link_.size(), 1);
next_process_time_ = arrival_time;
}
+
+ last_enqueue_time_us_ = packet.send_time_us;
return true;
}
@@ -201,19 +190,24 @@
return std::nullopt;
}
-void SimulatedNetwork::UpdateCapacityLink(ConfigState state,
- Timestamp time_now) {
- if (capacity_link_.has_value()) {
- // Recalculate the arrival time of the packet currently in the capacity link
- // since it may have changed if the capacity has changed.
- capacity_link_->last_update_time = std::max(
- capacity_link_->last_update_time, last_capacity_link_exit_time_);
- capacity_link_->arrival_time = CalculateArrivalTime(
- capacity_link_->last_update_time, capacity_link_->bits_left_to_send,
- state.config.link_capacity);
+void SimulatedNetwork::UpdateCapacityQueue(ConfigState state,
+ Timestamp time_now) {
+ // Only the first packet in capacity_link_ have a calculated arrival time
+ // (when packet leave the narrow section), and time when it entered the narrow
+ // section. Also, the configuration may have changed. Thus we need to
+ // calculate the arrival time again before maybe moving the packet to the
+ // delay link.
+ if (!capacity_link_.empty()) {
+ capacity_link_.front().last_update_time = std::max(
+ capacity_link_.front().last_update_time, last_capacity_link_exit_time_);
+ capacity_link_.front().arrival_time = CalculateArrivalTime(
+ capacity_link_.front().last_update_time,
+ capacity_link_.front().bits_left_to_send, state.config.link_capacity);
}
- if (!capacity_link_.has_value() || time_now < capacity_link_->arrival_time) {
+ // The capacity link is empty or the first packet is not expected to exit yet.
+ if (capacity_link_.empty() ||
+ time_now < capacity_link_.front().arrival_time) {
return;
}
bool reorder_packets = false;
@@ -221,9 +215,9 @@
do {
// Time to get this packet (the original or just updated arrival_time is
// smaller or equal to time_now_us).
- PacketInfo packet = *capacity_link_;
+ PacketInfo packet = capacity_link_.front();
RTC_DCHECK(packet.arrival_time.IsFinite());
- capacity_link_ = std::nullopt;
+ capacity_link_.pop();
// If the network is paused, the pause will be implemented as an extra delay
// to be spent in the `delay_link_` queue.
@@ -233,8 +227,8 @@
}
// Store the original arrival time, before applying packet loss or extra
- // delay. This is needed to know when it is possible for the next packet
- // in the queue to start transmitting.
+ // delay. This is needed to know when it is the first available time the
+ // next packet in the `capacity_link_` queue can start transmitting.
last_capacity_link_exit_time_ = packet.arrival_time;
// Drop packets at an average rate of `state.config.loss_percent` with
@@ -271,24 +265,19 @@
delay_link_.emplace_back(packet);
// If there are no packets in the queue, there is nothing else to do.
- std::optional<PacketInFlightInfo> peek_packet = queue_->PeekNextPacket();
- if (!peek_packet) {
+ if (capacity_link_.empty()) {
break;
}
- // It is possible that the next packet in the queue has a send time (at
- // least in tests) after the previous packet left the capacity link.
- Timestamp next_start =
- std::max(last_capacity_link_exit_time_, peek_packet->send_time());
- std::optional<PacketInFlightInfo> next_packet =
- queue_->DequeuePacket(next_start);
- capacity_link_ = {
- .packet = *next_packet,
- .last_update_time = next_start,
- .bits_left_to_send = 8 * static_cast<int64_t>(next_packet->size),
- .arrival_time = CalculateArrivalTime(next_start, next_packet->size * 8,
- state.config.link_capacity)};
+ // If instead there is another packet in the `capacity_link_` queue, let's
+ // calculate its arrival_time based on the latest config (which might
+ // have been changed since it was enqueued).
+ Timestamp next_start = std::max(last_capacity_link_exit_time_,
+ capacity_link_.front().last_update_time);
+ capacity_link_.front().arrival_time =
+ CalculateArrivalTime(next_start, capacity_link_.front().packet.size * 8,
+ state.config.link_capacity);
// And if the next packet in the queue needs to exit, let's dequeue it.
- } while (capacity_link_->arrival_time <= time_now);
+ } while (capacity_link_.front().arrival_time <= time_now);
if (state.config.allow_reordering && reorder_packets) {
// Packets arrived out of order and since the network config allows
@@ -311,13 +300,9 @@
RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
Timestamp receive_time = Timestamp::Micros(receive_time_us);
- UpdateCapacityLink(GetConfigState(), receive_time);
+ UpdateCapacityQueue(GetConfigState(), receive_time);
std::vector<PacketDeliveryInfo> packets_to_deliver;
- for (const PacketInFlightInfo& packet : queue_->DequeueDroppedPackets()) {
- packets_to_deliver.emplace_back(packet, PacketDeliveryInfo::kNotReceived);
- }
-
// Check the extra delay queue.
while (!delay_link_.empty() &&
receive_time >= delay_link_.front().arrival_time) {
@@ -346,8 +331,8 @@
break;
}
}
- if (next_process_time_.IsInfinite() && capacity_link_.has_value()) {
- next_process_time_ = capacity_link_->arrival_time;
+ if (next_process_time_.IsInfinite() && !capacity_link_.empty()) {
+ next_process_time_ = capacity_link_.front().arrival_time;
}
return next_process_time != next_process_time_;
}
diff --git a/test/network/simulated_network.h b/test/network/simulated_network.h
index 0fdb28c..7abf7ed 100644
--- a/test/network/simulated_network.h
+++ b/test/network/simulated_network.h
@@ -15,13 +15,12 @@
#include <cstdint>
#include <deque>
#include <functional>
-#include <memory>
#include <optional>
+#include <queue>
#include <vector>
#include "absl/functional/any_invocable.h"
#include "api/sequence_checker.h"
-#include "api/test/network_emulation/network_queue.h"
#include "api/test/simulated_network.h"
#include "api/units/timestamp.h"
#include "rtc_base/race_checker.h"
@@ -40,15 +39,11 @@
// packet through at the time with a limited capacity.
// - Extra delay with or without packets reorder
// - Packet overhead
-// Per default a simple leaky bucket queue is used that allows setting a max
-// capacity. But more advanced AQM can be used.
+// - Queue max capacity
class RTC_EXPORT SimulatedNetwork : public SimulatedNetworkInterface {
public:
using Config = BuiltInNetworkBehaviorConfig;
explicit SimulatedNetwork(Config config, uint64_t random_seed = 1);
- SimulatedNetwork(Config config,
- uint64_t random_seed,
- std::unique_ptr<NetworkQueue> queue);
~SimulatedNetwork() override;
// Sets a new configuration. This will affect packets that will be sent with
@@ -112,7 +107,7 @@
// Moves packets from capacity- to delay link.
// If `previouse_config` is set, it is the config that was used until
// `time_now_us`
- void UpdateCapacityLink(ConfigState state, Timestamp time_now)
+ void UpdateCapacityQueue(ConfigState state, Timestamp time_now)
RTC_RUN_ON(&process_checker_);
ConfigState GetConfigState() const;
@@ -121,13 +116,18 @@
// Guards the data structures involved in delay and loss processing, such as
// the packet queues.
RaceChecker process_checker_;
-
- // Queue of packets that have not yet entered the capacity link.
- std::unique_ptr<webrtc::NetworkQueue> queue_;
- // Models the capacity of the network. There can only be one packet at the
- // time in the capacity link. The time spend in the capacity link depends on
- // the link capacity.
- std::optional<PacketInfo> capacity_link_ RTC_GUARDED_BY(process_checker_);
+ // Models the capacity of the network by rejecting packets if the queue is
+ // full and keeping them in the queue until they are ready to exit (according
+ // to the link capacity, which cannot be violated, e.g. a 1 kbps link will
+ // only be able to deliver 1000 bits per second).
+ //
+ // Invariant:
+ // The head of the `capacity_link_` has arrival_time correctly set to the
+ // time when the packet is supposed to be delivered (without accounting
+ // potential packet loss or potential extra delay and without accounting for a
+ // new configuration of the network, which requires a re-computation of the
+ // arrival_time).
+ std::queue<PacketInfo> capacity_link_ RTC_GUARDED_BY(process_checker_);
// Models the extra delay of the network (see `queue_delay_ms`
// and `delay_standard_deviation_ms` in BuiltInNetworkBehaviorConfig), packets
// in the `delay_link_` have technically already left the network and don't
@@ -145,7 +145,7 @@
Random random_ RTC_GUARDED_BY(process_checker_);
// Are we currently dropping a burst of packets?
- bool bursting_ = false;
+ bool bursting_;
// The send time of the last enqueued packet, this is only used to check that
// the send time of enqueued packets is monotonically increasing.
diff --git a/test/network/simulated_network_unittest.cc b/test/network/simulated_network_unittest.cc
index 23ca9c5..9c59ded 100644
--- a/test/network/simulated_network_unittest.cc
+++ b/test/network/simulated_network_unittest.cc
@@ -11,15 +11,11 @@
#include <cstddef>
#include <cstdint>
-#include <memory>
#include <optional>
-#include <utility>
#include <vector>
-#include "api/test/network_emulation/leaky_bucket_network_queue.h"
#include "api/test/simulated_network.h"
#include "api/units/data_rate.h"
-#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "test/gmock.h"
@@ -29,10 +25,8 @@
namespace {
using ::testing::ElementsAre;
-using ::testing::Field;
using ::testing::MockFunction;
using ::testing::SizeIs;
-using ::testing::UnorderedElementsAre;
PacketInFlightInfo PacketWithSize(size_t size) {
return PacketInFlightInfo(/*size=*/size, /*send_time_us=*/0, /*packet_id=*/1);
@@ -468,7 +462,7 @@
/*receive_time_us=*/TimeDelta::Seconds(5).us());
ASSERT_EQ(delivered_packets.size(), 4ul);
- // And they have been reordered according to the applied extra delay.
+ // And they have been reordered accorting to the applied extra delay.
EXPECT_EQ(delivered_packets[0].packet_id, 3ul);
EXPECT_EQ(delivered_packets[1].packet_id, 1ul);
EXPECT_GE(delivered_packets[1].receive_time_us,
@@ -567,17 +561,18 @@
EXPECT_EQ(delivered_packets.size(), 20ul);
// Results in a burst of lost packets after the first packet lost.
- // With the current random seed, at least 5 packets are lost.
- int num_lost_packets = 0;
+ // With the current random seed, the first 12 are not lost, while the
+ // last 8 are.
+ int current_packet = 0;
for (const auto& packet : delivered_packets) {
- if (packet.receive_time_us == PacketDeliveryInfo::kNotReceived) {
- num_lost_packets++;
- }
- if (num_lost_packets > 0) {
+ if (current_packet < 12) {
+ EXPECT_NE(packet.receive_time_us, PacketDeliveryInfo::kNotReceived);
+ current_packet++;
+ } else {
EXPECT_EQ(packet.receive_time_us, PacketDeliveryInfo::kNotReceived);
+ current_packet++;
}
}
- EXPECT_GT(num_lost_packets, 5);
}
TEST(SimulatedNetworkTest, PauseTransmissionUntil) {
@@ -673,32 +668,6 @@
EXPECT_EQ(delivered_packets[0].receive_time_us, TimeDelta::Seconds(3).us());
}
-TEST(SimulatedNetworkTest, CanUseInjectedQueueAndDropPacketsAtQueueHead) {
- auto queue =
- std::make_unique<LeakyBucketNetworkQueue>(/*max_packet_capacity=*/3);
- LeakyBucketNetworkQueue* queue_ptr = queue.get();
- SimulatedNetwork network =
- SimulatedNetwork({.link_capacity = DataRate::KilobitsPerSec(1)},
- /*random_seed=*/1, std::move(queue));
- ASSERT_TRUE(network.EnqueuePacket(PacketInFlightInfo(
- DataSize::Bytes(125), Timestamp::Seconds(1), /*packet_id=*/0)));
- ASSERT_TRUE(network.EnqueuePacket(PacketInFlightInfo(
- DataSize::Bytes(125), Timestamp::Seconds(1), /*packet_id=*/1)));
-
- // packet 0 is already sent, packet 1 is in the queue and will be dropped.
- queue_ptr->DropOldestPacket();
-
- std::vector<PacketDeliveryInfo> delivered_packets =
- network.DequeueDeliverablePackets(network.NextDeliveryTimeUs().value());
- ASSERT_EQ(delivered_packets.size(), 2ul);
- EXPECT_THAT(
- delivered_packets,
- UnorderedElementsAre(Field(&PacketDeliveryInfo::packet_id, 0),
- AllOf(Field(&PacketDeliveryInfo::packet_id, 1),
- Field(&PacketDeliveryInfo::receive_time_us,
- PacketDeliveryInfo::kNotReceived))));
-}
-
// TODO(bugs.webrtc.org/14525): Re-enable when the DCHECK will be uncommented
// and the non-monotonic events on real time clock tests is solved/understood.
// TEST(SimulatedNetworkDeathTest, EnqueuePacketExpectMonotonicSendTime) {