| /* |
| * Copyright 2018 The WebRTC project authors. All Rights Reserved. |
| * |
| * Use of this source code is governed by a BSD-style license |
| * that can be found in the LICENSE file in the root of the source |
| * tree. An additional intellectual property rights grant can be found |
| * in the file PATENTS. All contributing project authors may |
| * be found in the AUTHORS file in the root of the source tree. |
| */ |
| #include "test/scenario/simulated_time.h" |
| |
| #include <inttypes.h> |
| #include <string.h> |
| #include <algorithm> |
| #include <cstdint> |
| #include <utility> |
| |
| #include "absl/memory/memory.h" |
| #include "absl/types/optional.h" |
| #include "api/rtc_event_log/rtc_event_log_factory.h" |
| #include "rtc_base/checks.h" |
| #include "rtc_base/socket_address.h" |
| |
| namespace webrtc { |
| namespace test { |
| namespace { |
| constexpr int kEventLogOutputIntervalMs = 5000; |
| struct RawFeedbackReportPacket { |
| static constexpr int MAX_FEEDBACKS = 10; |
| struct Feedback { |
| int16_t seq_offset; |
| int32_t recv_offset_ms; |
| }; |
| uint8_t count; |
| int64_t first_seq_num; |
| int64_t first_recv_time_ms; |
| Feedback feedbacks[MAX_FEEDBACKS - 1]; |
| }; |
| |
| std::unique_ptr<RtcEventLog> CreateEventLog( |
| TaskQueueFactory* task_queue_factory, |
| LogWriterFactoryInterface* log_writer_factory) { |
| if (!log_writer_factory) { |
| return absl::make_unique<RtcEventLogNull>(); |
| } |
| auto event_log = RtcEventLogFactory(task_queue_factory) |
| .CreateRtcEventLog(RtcEventLog::EncodingType::NewFormat); |
| bool success = event_log->StartLogging(log_writer_factory->Create(".rtc.dat"), |
| kEventLogOutputIntervalMs); |
| RTC_CHECK(success); |
| return event_log; |
| } |
| } // namespace |
| |
| PacketStream::PacketStream(PacketStreamConfig config) : config_(config) {} |
| |
| std::vector<int64_t> PacketStream::PullPackets(Timestamp at_time) { |
| if (next_frame_time_.IsInfinite()) |
| next_frame_time_ = at_time; |
| |
| TimeDelta frame_interval = TimeDelta::seconds(1) / config_.frame_rate; |
| int64_t frame_allowance = (frame_interval * target_rate_).bytes(); |
| |
| if (next_frame_is_keyframe_) { |
| frame_allowance *= config_.keyframe_multiplier; |
| next_frame_is_keyframe_ = false; |
| } |
| |
| std::vector<int64_t> packets; |
| while (at_time >= next_frame_time_) { |
| next_frame_time_ += frame_interval; |
| |
| int64_t frame_size = budget_ + frame_allowance; |
| frame_size = std::max(frame_size, config_.min_frame_size.bytes()); |
| budget_ += frame_allowance - frame_size; |
| |
| int64_t packet_budget = frame_size; |
| int64_t max_packet_size = config_.max_packet_size.bytes(); |
| while (packet_budget > max_packet_size) { |
| packets.push_back(max_packet_size); |
| packet_budget -= max_packet_size; |
| } |
| packets.push_back(packet_budget); |
| } |
| for (int64_t& packet : packets) |
| packet += config_.packet_overhead.bytes(); |
| return packets; |
| } |
| |
| void PacketStream::OnTargetRateUpdate(DataRate target_rate) { |
| target_rate_ = std::min(target_rate, config_.max_data_rate); |
| } |
| |
| SimpleFeedbackReportPacket FeedbackFromBuffer( |
| rtc::CopyOnWriteBuffer raw_buffer) { |
| RTC_CHECK_LE(sizeof(RawFeedbackReportPacket), raw_buffer.size()); |
| const RawFeedbackReportPacket& raw_packet = |
| *reinterpret_cast<const RawFeedbackReportPacket*>(raw_buffer.cdata()); |
| RTC_CHECK_GE(raw_packet.count, 1); |
| SimpleFeedbackReportPacket packet; |
| packet.receive_times.emplace_back(SimpleFeedbackReportPacket::ReceiveInfo{ |
| raw_packet.first_seq_num, Timestamp::ms(raw_packet.first_recv_time_ms)}); |
| for (int i = 1; i < raw_packet.count; ++i) |
| packet.receive_times.emplace_back(SimpleFeedbackReportPacket::ReceiveInfo{ |
| raw_packet.first_seq_num + raw_packet.feedbacks[i - 1].seq_offset, |
| Timestamp::ms(raw_packet.first_recv_time_ms + |
| raw_packet.feedbacks[i - 1].recv_offset_ms)}); |
| return packet; |
| } |
| |
| rtc::CopyOnWriteBuffer FeedbackToBuffer( |
| const SimpleFeedbackReportPacket packet) { |
| RTC_CHECK_LE(packet.receive_times.size(), |
| RawFeedbackReportPacket::MAX_FEEDBACKS); |
| RawFeedbackReportPacket report; |
| report.count = packet.receive_times.size(); |
| RTC_CHECK(!packet.receive_times.empty()); |
| report.first_seq_num = packet.receive_times.front().sequence_number; |
| report.first_recv_time_ms = packet.receive_times.front().receive_time.ms(); |
| |
| for (int i = 1; i < report.count; ++i) { |
| report.feedbacks[i - 1].seq_offset = static_cast<int16_t>( |
| packet.receive_times[i].sequence_number - report.first_seq_num); |
| report.feedbacks[i - 1].recv_offset_ms = static_cast<int32_t>( |
| packet.receive_times[i].receive_time.ms() - report.first_recv_time_ms); |
| } |
| return rtc::CopyOnWriteBuffer(reinterpret_cast<uint8_t*>(&report), |
| sizeof(RawFeedbackReportPacket)); |
| } |
| |
| SimulatedSender::SimulatedSender(EmulatedNetworkNode* send_node, |
| rtc::IPAddress send_receiver_ip) |
| : send_node_(send_node), send_receiver_address_(send_receiver_ip, 0) {} |
| |
| SimulatedSender::~SimulatedSender() {} |
| |
| TransportPacketsFeedback SimulatedSender::PullFeedbackReport( |
| SimpleFeedbackReportPacket packet, |
| Timestamp at_time) { |
| TransportPacketsFeedback report; |
| report.prior_in_flight = data_in_flight_; |
| report.feedback_time = at_time; |
| |
| for (auto& receive_info : packet.receive_times) { |
| // Look up sender side information for all packets up to and including each |
| // packet with feedback in the report. |
| for (; next_feedback_seq_num_ <= receive_info.sequence_number; |
| ++next_feedback_seq_num_) { |
| PacketResult feedback; |
| if (next_feedback_seq_num_ == receive_info.sequence_number) { |
| feedback.receive_time = receive_info.receive_time; |
| } else { |
| // If we did not get any feedback for this packet, mark it as lost by |
| // setting receive time to infinity. Note that this can also happen due |
| // to reordering, we will newer send feedback out of order. In this case |
| // the packet was not really lost, but we don't have that information. |
| feedback.receive_time = Timestamp::PlusInfinity(); |
| } |
| |
| // Looking up send side information. |
| for (auto it = sent_packets_.begin(); it != sent_packets_.end(); ++it) { |
| if (it->sequence_number == next_feedback_seq_num_) { |
| feedback.sent_packet = *it; |
| if (feedback.receive_time.IsFinite()) |
| sent_packets_.erase(it); |
| break; |
| } |
| } |
| |
| data_in_flight_ -= feedback.sent_packet.size; |
| report.packet_feedbacks.push_back(feedback); |
| } |
| } |
| report.data_in_flight = data_in_flight_; |
| return report; |
| } |
| |
| // Applies pacing and congetsion window based on the configuration from the |
| // congestion controller. This is not a complete implementation of the real |
| // pacer but useful for unit tests since it isn't limited to real time. |
| std::vector<SimulatedSender::PacketReadyToSend> |
| SimulatedSender::PaceAndPullSendPackets(Timestamp at_time) { |
| // TODO(srte): Extract the behavior of PacedSender to a threading and time |
| // independent component and use that here to allow a truthful simulation. |
| if (last_update_.IsInfinite()) { |
| pacing_budget_ = 0; |
| } else { |
| TimeDelta delta = at_time - last_update_; |
| pacing_budget_ += (delta * pacer_config_.data_rate()).bytes(); |
| } |
| std::vector<PacketReadyToSend> to_send; |
| while (data_in_flight_ <= max_in_flight_ && pacing_budget_ >= 0 && |
| !packet_queue_.empty()) { |
| PendingPacket pending = packet_queue_.front(); |
| pacing_budget_ -= pending.size; |
| packet_queue_.pop_front(); |
| SentPacket sent; |
| sent.sequence_number = next_sequence_number_++; |
| sent.size = DataSize::bytes(pending.size); |
| data_in_flight_ += sent.size; |
| sent.data_in_flight = data_in_flight_; |
| sent.pacing_info = PacedPacketInfo(); |
| sent.send_time = at_time; |
| sent_packets_.push_back(sent); |
| rtc::CopyOnWriteBuffer packet( |
| std::max<size_t>(pending.size, sizeof(sent.sequence_number))); |
| memcpy(packet.data(), &sent.sequence_number, sizeof(sent.sequence_number)); |
| to_send.emplace_back(PacketReadyToSend{sent, packet}); |
| } |
| pacing_budget_ = std::min<int64_t>(pacing_budget_, 0); |
| last_update_ = at_time; |
| return to_send; |
| } |
| |
| void SimulatedSender::Update(NetworkControlUpdate update) { |
| if (update.pacer_config) |
| pacer_config_ = *update.pacer_config; |
| if (update.congestion_window) |
| max_in_flight_ = *update.congestion_window; |
| } |
| |
| SimulatedFeedback::SimulatedFeedback(SimulatedTimeClientConfig config, |
| rtc::IPAddress return_receiver_ip, |
| EmulatedNetworkNode* return_node) |
| : config_(config), |
| return_receiver_address_(return_receiver_ip, 0), |
| return_node_(return_node) {} |
| |
| // Polls receiver side for a feedback report and sends it to the stream sender |
| // via return_node_, |
| void SimulatedFeedback::OnPacketReceived(EmulatedIpPacket packet) { |
| int64_t sequence_number; |
| memcpy(&sequence_number, packet.cdata(), sizeof(sequence_number)); |
| receive_times_.insert({sequence_number, packet.arrival_time}); |
| if (last_feedback_time_.IsInfinite()) |
| last_feedback_time_ = packet.arrival_time; |
| if (packet.arrival_time >= last_feedback_time_ + config_.feedback.interval) { |
| SimpleFeedbackReportPacket report; |
| for (; next_feedback_seq_num_ <= sequence_number; |
| ++next_feedback_seq_num_) { |
| auto it = receive_times_.find(next_feedback_seq_num_); |
| if (it != receive_times_.end()) { |
| report.receive_times.emplace_back( |
| SimpleFeedbackReportPacket::ReceiveInfo{next_feedback_seq_num_, |
| it->second}); |
| receive_times_.erase(it); |
| } |
| if (report.receive_times.size() >= |
| RawFeedbackReportPacket::MAX_FEEDBACKS) { |
| return_node_->OnPacketReceived( |
| EmulatedIpPacket(packet.to, return_receiver_address_, |
| FeedbackToBuffer(report), packet.arrival_time)); |
| report = SimpleFeedbackReportPacket(); |
| } |
| } |
| if (!report.receive_times.empty()) |
| return_node_->OnPacketReceived( |
| EmulatedIpPacket(packet.to, return_receiver_address_, |
| FeedbackToBuffer(report), packet.arrival_time)); |
| last_feedback_time_ = packet.arrival_time; |
| } |
| } |
| |
| SimulatedTimeClient::SimulatedTimeClient( |
| TimeController* time_controller, |
| std::unique_ptr<LogWriterFactoryInterface> log_writer_factory, |
| SimulatedTimeClientConfig config, |
| std::vector<PacketStreamConfig> stream_configs, |
| std::vector<EmulatedNetworkNode*> send_link, |
| std::vector<EmulatedNetworkNode*> return_link, |
| rtc::IPAddress send_receiver_ip, |
| rtc::IPAddress return_receiver_ip, |
| Timestamp at_time) |
| : log_writer_factory_(std::move(log_writer_factory)), |
| event_log_(CreateEventLog(time_controller->GetTaskQueueFactory(), |
| log_writer_factory_.get())), |
| network_controller_factory_(log_writer_factory_.get(), config.transport), |
| send_link_(send_link), |
| return_link_(return_link), |
| sender_(send_link.front(), send_receiver_ip), |
| feedback_(config, return_receiver_ip, return_link.front()) { |
| current_contraints_.at_time = at_time; |
| current_contraints_.starting_rate = config.transport.rates.start_rate; |
| current_contraints_.min_data_rate = config.transport.rates.min_rate; |
| current_contraints_.max_data_rate = config.transport.rates.max_rate; |
| NetworkControllerConfig initial_config; |
| initial_config.constraints = current_contraints_; |
| initial_config.stream_based_config.max_padding_rate = |
| config.transport.rates.max_padding_rate; |
| initial_config.event_log = event_log_.get(); |
| congestion_controller_ = network_controller_factory_.Create(initial_config); |
| for (auto& stream_config : stream_configs) |
| packet_streams_.emplace_back(new PacketStream(stream_config)); |
| EmulatedNetworkNode::CreateRoute(send_receiver_ip, send_link, &feedback_); |
| EmulatedNetworkNode::CreateRoute(return_receiver_ip, return_link, this); |
| |
| CongestionProcess(at_time); |
| network_controller_factory_.LogCongestionControllerStats(at_time); |
| if (log_writer_factory_) { |
| packet_log_ = log_writer_factory_->Create(".packets.txt"); |
| packet_log_->Write( |
| "transport_seq packet_size send_time recv_time feed_time\n"); |
| } |
| } |
| |
| // Pulls feedback reports from sender side based on the recieved feedback |
| // packet. Updates congestion controller with the resulting report. |
| void SimulatedTimeClient::OnPacketReceived(EmulatedIpPacket packet) { |
| auto report = sender_.PullFeedbackReport(FeedbackFromBuffer(packet.data), |
| packet.arrival_time); |
| for (PacketResult& feedback : report.packet_feedbacks) { |
| if (packet_log_) |
| LogWriteFormat(packet_log_.get(), |
| "%" PRId64 " %" PRId64 " %.3lf %.3lf %.3lf\n", |
| feedback.sent_packet.sequence_number, |
| feedback.sent_packet.size.bytes(), |
| feedback.sent_packet.send_time.seconds<double>(), |
| feedback.receive_time.seconds<double>(), |
| packet.arrival_time.seconds<double>()); |
| } |
| Update(congestion_controller_->OnTransportPacketsFeedback(report)); |
| } |
| SimulatedTimeClient::~SimulatedTimeClient() { |
| } |
| |
| void SimulatedTimeClient::Update(NetworkControlUpdate update) { |
| sender_.Update(update); |
| if (update.target_rate) { |
| // TODO(srte): Implement more realistic distribution of bandwidths between |
| // streams. Either using BitrateAllocationStrategy directly or using |
| // BitrateAllocation. |
| double ratio_per_stream = 1.0 / packet_streams_.size(); |
| DataRate rate_per_stream = |
| update.target_rate->target_rate * ratio_per_stream; |
| target_rate_ = update.target_rate->target_rate; |
| link_capacity_ = update.target_rate->network_estimate.bandwidth; |
| for (auto& stream : packet_streams_) |
| stream->OnTargetRateUpdate(rate_per_stream); |
| } |
| } |
| |
| void SimulatedTimeClient::CongestionProcess(Timestamp at_time) { |
| ProcessInterval msg; |
| msg.at_time = at_time; |
| Update(congestion_controller_->OnProcessInterval(msg)); |
| } |
| |
| void SimulatedTimeClient::PacerProcess(Timestamp at_time) { |
| ProcessFrames(at_time); |
| for (const auto& to_send : sender_.PaceAndPullSendPackets(at_time)) { |
| sender_.send_node_->OnPacketReceived(EmulatedIpPacket( |
| /*from=*/rtc::SocketAddress(), sender_.send_receiver_address_, |
| to_send.data, at_time)); |
| Update(congestion_controller_->OnSentPacket(to_send.send_info)); |
| } |
| } |
| |
| void SimulatedTimeClient::ProcessFrames(Timestamp at_time) { |
| for (auto& stream : packet_streams_) { |
| for (int64_t packet_size : stream->PullPackets(at_time)) { |
| sender_.packet_queue_.push_back( |
| SimulatedSender::PendingPacket{packet_size}); |
| } |
| } |
| } |
| |
| void SimulatedTimeClient::TriggerFakeReroute(Timestamp at_time) { |
| NetworkRouteChange msg; |
| msg.at_time = at_time; |
| msg.constraints = current_contraints_; |
| msg.constraints.at_time = at_time; |
| Update(congestion_controller_->OnNetworkRouteChange(msg)); |
| } |
| |
| TimeDelta SimulatedTimeClient::GetNetworkControllerProcessInterval() const { |
| return network_controller_factory_.GetProcessInterval(); |
| } |
| |
| DataRate SimulatedTimeClient::link_capacity() const { |
| return link_capacity_; |
| } |
| |
| double SimulatedTimeClient::target_rate_kbps() const { |
| return target_rate_.kbps<double>(); |
| } |
| |
| DataRate SimulatedTimeClient::padding_rate() const { |
| return sender_.pacer_config_.pad_rate(); |
| } |
| |
| } // namespace test |
| } // namespace webrtc |