blob: c9917373feb7b80c6cdb88cc1945e133827ae31b [file] [log] [blame]
/*
* Copyright 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "rtc_tools/network_tester/packet_sender.h"
#include <algorithm>
#include <memory>
#include <string>
#include <utility>
#include "absl/functional/any_invocable.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/task_queue/task_queue_base.h"
#include "rtc_base/time_utils.h"
#include "rtc_tools/network_tester/config_reader.h"
#include "rtc_tools/network_tester/test_controller.h"
namespace webrtc {
namespace {
absl::AnyInvocable<void() &&> SendPacketTask(
PacketSender* packet_sender,
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
int64_t target_time_ms = rtc::TimeMillis()) {
return [target_time_ms, packet_sender,
task_safety_flag = std::move(task_safety_flag)]() mutable {
if (task_safety_flag->alive() && packet_sender->IsSending()) {
packet_sender->SendPacket();
target_time_ms += packet_sender->GetSendIntervalMs();
int64_t delay_ms =
std::max(static_cast<int64_t>(0), target_time_ms - rtc::TimeMillis());
TaskQueueBase::Current()->PostDelayedTask(
SendPacketTask(packet_sender, std::move(task_safety_flag),
target_time_ms),
TimeDelta::Millis(delay_ms));
}
};
}
absl::AnyInvocable<void() &&> UpdateTestSettingTask(
PacketSender* packet_sender,
std::unique_ptr<ConfigReader> config_reader,
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag) {
return [packet_sender, config_reader = std::move(config_reader),
task_safety_flag = std::move(task_safety_flag)]() mutable {
if (!task_safety_flag->alive()) {
return;
}
if (absl::optional<ConfigReader::Config> config =
config_reader->GetNextConfig()) {
packet_sender->UpdateTestSetting(config->packet_size,
config->packet_send_interval_ms);
TaskQueueBase::Current()->PostDelayedTask(
UpdateTestSettingTask(packet_sender, std::move(config_reader),
std::move(task_safety_flag)),
TimeDelta::Millis(config->execution_time_ms));
} else {
packet_sender->StopSending();
}
};
}
} // namespace
PacketSender::PacketSender(
TestController* test_controller,
webrtc::TaskQueueBase* worker_queue,
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> task_safety_flag,
const std::string& config_file_path)
: packet_size_(0),
send_interval_ms_(0),
sequence_number_(0),
sending_(false),
config_file_path_(config_file_path),
test_controller_(test_controller),
worker_queue_(worker_queue),
task_safety_flag_(task_safety_flag) {}
PacketSender::~PacketSender() = default;
void PacketSender::StartSending() {
worker_queue_checker_.Detach();
worker_queue_->PostTask(SafeTask(task_safety_flag_, [this]() {
RTC_DCHECK_RUN_ON(&worker_queue_checker_);
sending_ = true;
}));
worker_queue_->PostTask(UpdateTestSettingTask(
this, std::make_unique<ConfigReader>(config_file_path_),
task_safety_flag_));
worker_queue_->PostTask(SendPacketTask(this, task_safety_flag_));
}
void PacketSender::StopSending() {
RTC_DCHECK_RUN_ON(&worker_queue_checker_);
sending_ = false;
test_controller_->OnTestDone();
}
bool PacketSender::IsSending() const {
RTC_DCHECK_RUN_ON(&worker_queue_checker_);
return sending_;
}
void PacketSender::SendPacket() {
RTC_DCHECK_RUN_ON(&worker_queue_checker_);
NetworkTesterPacket packet;
packet.set_type(NetworkTesterPacket::TEST_DATA);
packet.set_sequence_number(sequence_number_++);
packet.set_send_timestamp(rtc::TimeMicros());
test_controller_->SendData(packet, packet_size_);
}
int64_t PacketSender::GetSendIntervalMs() const {
RTC_DCHECK_RUN_ON(&worker_queue_checker_);
return send_interval_ms_;
}
void PacketSender::UpdateTestSetting(size_t packet_size,
int64_t send_interval_ms) {
RTC_DCHECK_RUN_ON(&worker_queue_checker_);
send_interval_ms_ = send_interval_ms;
packet_size_ = packet_size;
}
} // namespace webrtc