blob: c8de4e44d9454827b8e94ed83650d4ea2ba138fa [file] [log] [blame]
/*
* Copyright (c) 2025 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "video/timing/simulator/rtc_event_log_driver.h"
#include <cstdint>
#include <memory>
#include <optional>
#include <utility>
#include "absl/base/nullability.h"
#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
#include "api/environment/environment_factory.h"
#include "api/field_trials.h"
#include "api/sequence_checker.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "logging/rtc_event_log/events/logged_rtp_rtcp.h"
#include "logging/rtc_event_log/events/rtc_event_video_receive_stream_config.h"
#include "logging/rtc_event_log/rtc_event_log_parser.h"
#include "modules/rtp_rtcp/source/rtp_packet_received.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "test/time_controller/simulated_time_task_queue_controller.h"
namespace webrtc::video_timing_simulator {
RtcEventLogDriver::RtcEventLogDriver(
const Config& config,
const ParsedRtcEventLog* absl_nonnull parsed_log,
absl::string_view field_trials_string,
RtcEventLogDriver::StreamInterfaceFactory stream_factory)
: config_(config),
time_controller_(Timestamp::Zero()),
env_(CreateEnvironment(
std::make_unique<webrtc::FieldTrials>(field_trials_string),
time_controller_.GetClock(),
time_controller_.GetTaskQueueFactory())),
parsed_log_(*parsed_log),
stream_factory_(std::move(stream_factory)),
prev_log_timestamp_(std::nullopt),
simulator_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
"simulator_queue",
TaskQueueFactory::Priority::NORMAL)),
packet_simulator_(env_) {
RTC_DCHECK(stream_factory_) << "stream_factory must be provided";
// Config events.
processor_.AddEvents(
parsed_log_.video_recv_configs(),
[&](const auto& config) { OnLoggedVideoRecvConfig(config); });
// Video packet events.
for (const auto& stream : parsed_log_.incoming_rtp_packets_by_ssrc()) {
bool is_video = parsed_log_.GetMediaType(
stream.ssrc, PacketDirection::kIncomingPacket) ==
ParsedRtcEventLog::MediaType::VIDEO;
bool is_rtx = parsed_log_.incoming_rtx_ssrcs().contains(stream.ssrc);
// TODO: b/423646186 - Handle RTX.
if (!is_video || is_rtx) {
continue;
}
processor_.AddEvents(stream.incoming_packets, [&](const auto& packet) {
OnLoggedRtpPacketIncoming(packet);
});
}
}
RtcEventLogDriver::~RtcEventLogDriver() = default;
void RtcEventLogDriver::Simulate() {
// Walk through events in timestamp order and call the registered handlers.
processor_.ProcessEventsInOrder();
// Attempt to get straggling frames out by advancing time a little bit after
// the last logged event.
time_controller_.AdvanceTime(kShutdownAdvanceTimeSlack);
// Tear down on the queue.
bool done = false;
simulator_queue_->PostTask([this, &done]() {
RTC_DCHECK_RUN_ON(simulator_queue_.get());
for (auto& stream : streams_) {
stream.second->Close();
}
streams_.clear();
done = true;
});
time_controller_.AdvanceTime(TimeDelta::Zero());
RTC_DCHECK(done);
}
void RtcEventLogDriver::AdvanceTime(Timestamp log_timestamp) {
if (!prev_log_timestamp_) {
// For the first event, set the clock in absolute terms.
prev_log_timestamp_ = log_timestamp;
time_controller_.AdvanceTime(log_timestamp - env_.clock().CurrentTime());
RTC_DCHECK_EQ(env_.clock().CurrentTime(), log_timestamp);
return;
}
TimeDelta duration = log_timestamp - *prev_log_timestamp_;
prev_log_timestamp_ = log_timestamp;
if (duration < TimeDelta::Zero()) {
RTC_LOG(LS_ERROR)
<< "Non-monotonic sequence of timestamps. Will not advance time."
<< " (simulated_ts=" << env_.clock().CurrentTime() << ")";
return;
}
time_controller_.AdvanceTime(duration);
}
void RtcEventLogDriver::HandleEvent(Timestamp log_timestamp,
absl::AnyInvocable<void() &&> handler) {
// Execute all tasks scheduled before the new logged event.
AdvanceTime(log_timestamp);
bool done = false;
simulator_queue_->PostTask([handler = std::move(handler), &done]() mutable {
std::move(handler)();
done = true;
});
// Execute the logged event itself.
AdvanceTime(log_timestamp);
RTC_DCHECK(done) << "Handler was not executed";
}
void RtcEventLogDriver::OnLoggedVideoRecvConfig(
const webrtc::LoggedVideoRecvConfig& config) {
uint32_t ssrc = config.config.remote_ssrc;
HandleEvent(config.log_time(), [this, ssrc]() {
RTC_DCHECK_RUN_ON(simulator_queue_.get());
RTC_LOG(LS_INFO) << "OnLoggedVideoRecvConfig for ssrc=" << ssrc
<< " (simulated_ts=" << env_.clock().CurrentTime() << ")";
if (auto it = streams_.find(ssrc); it != streams_.end()) {
if (config_.reuse_streams) {
RTC_LOG(LS_WARNING)
<< "Video receive stream for ssrc=" << ssrc
<< " already existed. Reusing it."
<< " (simulated_ts=" << env_.clock().CurrentTime() << ")";
return;
} else {
RTC_LOG(LS_WARNING)
<< "Video receive stream for ssrc=" << ssrc
<< " already existed. Overwriting it."
<< " (simulated_ts=" << env_.clock().CurrentTime() << ")";
it->second->Close();
}
}
std::unique_ptr<StreamInterface> stream = stream_factory_(env_, ssrc);
RTC_DCHECK(stream);
streams_[ssrc] = std::move(stream);
});
}
void RtcEventLogDriver::OnLoggedRtpPacketIncoming(
const webrtc::LoggedRtpPacketIncoming& packet) {
HandleEvent(packet.log_time(), [this, packet]() {
RTC_DCHECK_RUN_ON(simulator_queue_.get());
if (auto it = streams_.find(packet.rtp.header.ssrc); it != streams_.end()) {
RtpPacketReceived rtp_packet =
packet_simulator_.SimulateRtpPacketReceived(packet.rtp);
RTC_DCHECK_EQ(rtp_packet.arrival_time(), packet.log_time());
RTC_DCHECK_EQ(env_.clock().CurrentTime(), packet.log_time());
it->second->InsertPacket(rtp_packet);
} else {
RTC_LOG(LS_WARNING) << "Received packet for unknown ssrc="
<< packet.rtp.header.ssrc
<< " (simulated_ts=" << env_.clock().CurrentTime()
<< ")";
}
});
}
} // namespace webrtc::video_timing_simulator