Reduces locking in SimulatedNetwork class.
Bug: webrtc:9883
Change-Id: I07c78fd2dbba5e7481194b0d1aabfb56809ff6fc
Reviewed-on: https://webrtc-review.googlesource.com/c/120612
Reviewed-by: Christoffer Rodbro <crodbro@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26491}
diff --git a/call/BUILD.gn b/call/BUILD.gn
index 8d65aa2..38b2f41 100644
--- a/call/BUILD.gn
+++ b/call/BUILD.gn
@@ -287,6 +287,7 @@
"../api/units:time_delta",
"../rtc_base:checks",
"../rtc_base:rtc_base_approved",
+ "../rtc_base:sequenced_task_checker",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/types:optional",
]
diff --git a/call/simulated_network.cc b/call/simulated_network.cc
index 92e945b..9bb8bab 100644
--- a/call/simulated_network.cc
+++ b/call/simulated_network.cc
@@ -31,12 +31,12 @@
void SimulatedNetwork::SetConfig(const SimulatedNetwork::Config& config) {
rtc::CritScope crit(&config_lock_);
- config_ = config; // Shallow copy of the struct.
+ config_state_.config = config; // Shallow copy of the struct.
double prob_loss = config.loss_percent / 100.0;
- if (config_.avg_burst_loss_length == -1) {
+ if (config_state_.config.avg_burst_loss_length == -1) {
// Uniform loss
- prob_loss_bursting_ = prob_loss;
- prob_start_bursting_ = prob_loss;
+ config_state_.prob_loss_bursting = prob_loss;
+ config_state_.prob_start_bursting = prob_loss;
} else {
// Lose packets according to a gilbert-elliot model.
int avg_burst_loss_length = config.avg_burst_loss_length;
@@ -47,29 +47,27 @@
<< " avg_burst_loss_length must be " << min_avg_burst_loss_length + 1
<< " or higher.";
- prob_loss_bursting_ = (1.0 - 1.0 / avg_burst_loss_length);
- prob_start_bursting_ = prob_loss / (1 - prob_loss) / avg_burst_loss_length;
+ config_state_.prob_loss_bursting = (1.0 - 1.0 / avg_burst_loss_length);
+ config_state_.prob_start_bursting =
+ prob_loss / (1 - prob_loss) / avg_burst_loss_length;
}
}
void SimulatedNetwork::PauseTransmissionUntil(int64_t until_us) {
rtc::CritScope crit(&config_lock_);
- pause_transmission_until_us_ = until_us;
+ config_state_.pause_transmission_until_us = until_us;
}
bool SimulatedNetwork::EnqueuePacket(PacketInFlightInfo packet) {
- Config config;
- {
- rtc::CritScope crit(&config_lock_);
- config = config_;
- }
+ RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
+ ConfigState state = GetConfigState();
- UpdateCapacityQueue(packet.send_time_us);
+ UpdateCapacityQueue(state, packet.send_time_us);
- packet.size += config.packet_overhead;
- rtc::CritScope crit(&process_lock_);
- if (config.queue_length_packets > 0 &&
- capacity_link_.size() >= config.queue_length_packets) {
+ packet.size += state.config.packet_overhead;
+
+ if (state.config.queue_length_packets > 0 &&
+ capacity_link_.size() >= state.config.queue_length_packets) {
// Too many packet on the link, drop this one.
return false;
}
@@ -83,123 +81,114 @@
}
absl::optional<int64_t> SimulatedNetwork::NextDeliveryTimeUs() const {
- rtc::CritScope crit(&process_lock_);
+ RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
if (!delay_link_.empty())
return delay_link_.begin()->arrival_time_us;
return absl::nullopt;
}
-void SimulatedNetwork::UpdateCapacityQueue(int64_t time_now_us) {
- Config config;
- double prob_loss_bursting;
- double prob_start_bursting;
- int64_t pause_transmission_until_us;
- {
- rtc::CritScope crit(&config_lock_);
- config = config_;
- prob_loss_bursting = prob_loss_bursting_;
- prob_start_bursting = prob_start_bursting_;
- pause_transmission_until_us = pause_transmission_until_us_.value_or(0);
- }
- {
- rtc::CritScope crit(&process_lock_);
- bool needs_sort = false;
+void SimulatedNetwork::UpdateCapacityQueue(ConfigState state,
+ int64_t time_now_us) {
+ bool needs_sort = false;
- // Catch for thread races.
- if (time_now_us < last_capacity_link_visit_us_.value_or(time_now_us))
- return;
+ // Catch for thread races.
+ if (time_now_us < last_capacity_link_visit_us_.value_or(time_now_us))
+ return;
- int64_t time_us = last_capacity_link_visit_us_.value_or(time_now_us);
- // Check the capacity link first.
- while (!capacity_link_.empty()) {
- int64_t time_until_front_exits_us = 0;
- if (config.link_capacity_kbps > 0) {
- int64_t remaining_bits =
- capacity_link_.front().packet.size * 8 - pending_drain_bits_;
- RTC_DCHECK(remaining_bits > 0);
- // Division rounded up - packet not delivered until its last bit is.
- time_until_front_exits_us =
- (1000 * remaining_bits + config.link_capacity_kbps - 1) /
- config.link_capacity_kbps;
- }
-
- if (time_us + time_until_front_exits_us > time_now_us) {
- // Packet at front will not exit yet. Will not enter here on infinite
- // capacity(=0) so no special handling needed.
- pending_drain_bits_ +=
- ((time_now_us - time_us) * config.link_capacity_kbps) / 1000;
- break;
- }
- if (config.link_capacity_kbps > 0) {
- pending_drain_bits_ +=
- (time_until_front_exits_us * config.link_capacity_kbps) / 1000;
- } else {
- // Enough to drain the whole queue.
- pending_drain_bits_ = queue_size_bytes_ * 8;
- }
-
- // Time to get this packet.
- PacketInfo packet = std::move(capacity_link_.front());
- capacity_link_.pop();
-
- time_us += time_until_front_exits_us;
- RTC_DCHECK(time_us >= packet.packet.send_time_us);
- packet.arrival_time_us = std::max(pause_transmission_until_us, time_us);
- queue_size_bytes_ -= packet.packet.size;
- pending_drain_bits_ -= packet.packet.size * 8;
- RTC_DCHECK(pending_drain_bits_ >= 0);
-
- // Drop packets at an average rate of |config_.loss_percent| with
- // and average loss burst length of |config_.avg_burst_loss_length|.
- if ((bursting_ && random_.Rand<double>() < prob_loss_bursting) ||
- (!bursting_ && random_.Rand<double>() < prob_start_bursting)) {
- bursting_ = true;
- packet.arrival_time_us = PacketDeliveryInfo::kNotReceived;
- } else {
- bursting_ = false;
- int64_t arrival_time_jitter_us = std::max(
- random_.Gaussian(config.queue_delay_ms * 1000,
- config.delay_standard_deviation_ms * 1000),
- 0.0);
-
- // If reordering is not allowed then adjust arrival_time_jitter
- // to make sure all packets are sent in order.
- int64_t last_arrival_time_us =
- delay_link_.empty() ? -1 : delay_link_.back().arrival_time_us;
- if (!config.allow_reordering && !delay_link_.empty() &&
- packet.arrival_time_us + arrival_time_jitter_us <
- last_arrival_time_us) {
- arrival_time_jitter_us =
- last_arrival_time_us - packet.arrival_time_us;
- }
- packet.arrival_time_us += arrival_time_jitter_us;
- if (packet.arrival_time_us >= last_arrival_time_us) {
- last_arrival_time_us = packet.arrival_time_us;
- } else {
- needs_sort = true;
- }
- }
- delay_link_.emplace_back(std::move(packet));
+ int64_t time_us = last_capacity_link_visit_us_.value_or(time_now_us);
+ // Check the capacity link first.
+ while (!capacity_link_.empty()) {
+ int64_t time_until_front_exits_us = 0;
+ if (state.config.link_capacity_kbps > 0) {
+ int64_t remaining_bits =
+ capacity_link_.front().packet.size * 8 - pending_drain_bits_;
+ RTC_DCHECK(remaining_bits > 0);
+ // Division rounded up - packet not delivered until its last bit is.
+ time_until_front_exits_us =
+ (1000 * remaining_bits + state.config.link_capacity_kbps - 1) /
+ state.config.link_capacity_kbps;
}
- last_capacity_link_visit_us_ = time_now_us;
- // Cannot save unused capacity for later.
- pending_drain_bits_ = std::min(pending_drain_bits_, queue_size_bytes_ * 8);
- if (needs_sort) {
- // Packet(s) arrived out of order, make sure list is sorted.
- std::sort(delay_link_.begin(), delay_link_.end(),
- [](const PacketInfo& p1, const PacketInfo& p2) {
- return p1.arrival_time_us < p2.arrival_time_us;
- });
+ if (time_us + time_until_front_exits_us > time_now_us) {
+ // Packet at front will not exit yet. Will not enter here on infinite
+ // capacity(=0) so no special handling needed.
+ pending_drain_bits_ +=
+ ((time_now_us - time_us) * state.config.link_capacity_kbps) / 1000;
+ break;
}
+ if (state.config.link_capacity_kbps > 0) {
+ pending_drain_bits_ +=
+ (time_until_front_exits_us * state.config.link_capacity_kbps) / 1000;
+ } else {
+ // Enough to drain the whole queue.
+ pending_drain_bits_ = queue_size_bytes_ * 8;
+ }
+
+ // Time to get this packet.
+ PacketInfo packet = std::move(capacity_link_.front());
+ capacity_link_.pop();
+
+ time_us += time_until_front_exits_us;
+ RTC_DCHECK(time_us >= packet.packet.send_time_us);
+ packet.arrival_time_us =
+ std::max(state.pause_transmission_until_us, time_us);
+ queue_size_bytes_ -= packet.packet.size;
+ pending_drain_bits_ -= packet.packet.size * 8;
+ RTC_DCHECK(pending_drain_bits_ >= 0);
+
+ // Drop packets at an average rate of |state.config.loss_percent| with
+ // and average loss burst length of |state.config.avg_burst_loss_length|.
+ if ((bursting_ && random_.Rand<double>() < state.prob_loss_bursting) ||
+ (!bursting_ && random_.Rand<double>() < state.prob_start_bursting)) {
+ bursting_ = true;
+ packet.arrival_time_us = PacketDeliveryInfo::kNotReceived;
+ } else {
+ bursting_ = false;
+ int64_t arrival_time_jitter_us = std::max(
+ random_.Gaussian(state.config.queue_delay_ms * 1000,
+ state.config.delay_standard_deviation_ms * 1000),
+ 0.0);
+
+ // If reordering is not allowed then adjust arrival_time_jitter
+ // to make sure all packets are sent in order.
+ int64_t last_arrival_time_us =
+ delay_link_.empty() ? -1 : delay_link_.back().arrival_time_us;
+ if (!state.config.allow_reordering && !delay_link_.empty() &&
+ packet.arrival_time_us + arrival_time_jitter_us <
+ last_arrival_time_us) {
+ arrival_time_jitter_us = last_arrival_time_us - packet.arrival_time_us;
+ }
+ packet.arrival_time_us += arrival_time_jitter_us;
+ if (packet.arrival_time_us >= last_arrival_time_us) {
+ last_arrival_time_us = packet.arrival_time_us;
+ } else {
+ needs_sort = true;
+ }
+ }
+ delay_link_.emplace_back(std::move(packet));
}
+ last_capacity_link_visit_us_ = time_now_us;
+ // Cannot save unused capacity for later.
+ pending_drain_bits_ = std::min(pending_drain_bits_, queue_size_bytes_ * 8);
+
+ if (needs_sort) {
+ // Packet(s) arrived out of order, make sure list is sorted.
+ std::sort(delay_link_.begin(), delay_link_.end(),
+ [](const PacketInfo& p1, const PacketInfo& p2) {
+ return p1.arrival_time_us < p2.arrival_time_us;
+ });
+ }
+}
+
+SimulatedNetwork::ConfigState SimulatedNetwork::GetConfigState() const {
+ rtc::CritScope crit(&config_lock_);
+ return config_state_;
}
std::vector<PacketDeliveryInfo> SimulatedNetwork::DequeueDeliverablePackets(
int64_t receive_time_us) {
- UpdateCapacityQueue(receive_time_us);
-
- rtc::CritScope crit(&process_lock_);
+ RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
+ UpdateCapacityQueue(GetConfigState(), receive_time_us);
std::vector<PacketDeliveryInfo> packets_to_deliver;
// Check the extra delay queue.
while (!delay_link_.empty() &&
diff --git a/call/simulated_network.h b/call/simulated_network.h
index 4085600..6adb412 100644
--- a/call/simulated_network.h
+++ b/call/simulated_network.h
@@ -18,8 +18,10 @@
#include "absl/types/optional.h"
#include "api/test/simulated_network.h"
#include "rtc_base/critical_section.h"
+#include "rtc_base/race_checker.h"
#include "rtc_base/random.h"
#include "rtc_base/thread_annotations.h"
+#include "rtc_base/thread_checker.h"
namespace webrtc {
@@ -48,39 +50,43 @@
PacketInFlightInfo packet;
int64_t arrival_time_us;
};
+ // Contains current configuration state.
+ struct ConfigState {
+ // Static link configuration.
+ Config config;
+ // The probability to drop the packet if we are currently dropping a
+ // burst of packet
+ double prob_loss_bursting;
+ // The probability to drop a burst of packets.
+ double prob_start_bursting;
+ // Used for temporary delay spikes.
+ int64_t pause_transmission_until_us = 0;
+ };
// Moves packets from capacity- to delay link.
- void UpdateCapacityQueue(int64_t time_now_us);
+ void UpdateCapacityQueue(ConfigState state, int64_t time_now_us)
+ RTC_RUN_ON(&process_checker_);
+ ConfigState GetConfigState() const;
rtc::CriticalSection config_lock_;
- // |process_lock| guards the data structures involved in delay and loss
+ // |process_checker_| guards the data structures involved in delay and loss
// processes, such as the packet queues.
- rtc::CriticalSection process_lock_;
- std::queue<PacketInfo> capacity_link_ RTC_GUARDED_BY(process_lock_);
+ rtc::RaceChecker process_checker_;
+ std::queue<PacketInfo> capacity_link_ RTC_GUARDED_BY(process_checker_);
Random random_;
- std::deque<PacketInfo> delay_link_ RTC_GUARDED_BY(process_lock_);
+ std::deque<PacketInfo> delay_link_ RTC_GUARDED_BY(process_checker_);
- // Link configuration.
- Config config_ RTC_GUARDED_BY(config_lock_);
- absl::optional<int64_t> pause_transmission_until_us_
- RTC_GUARDED_BY(config_lock_);
+ ConfigState config_state_ RTC_GUARDED_BY(config_lock_);
// Are we currently dropping a burst of packets?
bool bursting_;
- // The probability to drop the packet if we are currently dropping a
- // burst of packet
- double prob_loss_bursting_ RTC_GUARDED_BY(config_lock_);
-
- // The probability to drop a burst of packets.
- double prob_start_bursting_ RTC_GUARDED_BY(config_lock_);
-
- int64_t queue_size_bytes_ RTC_GUARDED_BY(process_lock_) = 0;
- int64_t pending_drain_bits_ RTC_GUARDED_BY(process_lock_) = 0;
+ int64_t queue_size_bytes_ RTC_GUARDED_BY(process_checker_) = 0;
+ int64_t pending_drain_bits_ RTC_GUARDED_BY(process_checker_) = 0;
absl::optional<int64_t> last_capacity_link_visit_us_
- RTC_GUARDED_BY(process_lock_);
+ RTC_GUARDED_BY(process_checker_);
};
} // namespace webrtc