blob: f3f6d5142d01fc3ed9b17ee764d9cd1c5ab3b87f [file] [log] [blame]
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/rtp_rtcp/source/rtcp_transceiver_impl.h"
#include <algorithm>
#include <optional>
#include <utility>
#include "absl/algorithm/container.h"
#include "absl/memory/memory.h"
#include "api/video/video_bitrate_allocation.h"
#include "modules/rtp_rtcp/include/receive_statistics.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/ntp_time_util.h"
#include "modules/rtp_rtcp/source/rtcp_packet.h"
#include "modules/rtp_rtcp/source/rtcp_packet/bye.h"
#include "modules/rtp_rtcp/source/rtcp_packet/common_header.h"
#include "modules/rtp_rtcp/source/rtcp_packet/extended_reports.h"
#include "modules/rtp_rtcp/source/rtcp_packet/fir.h"
#include "modules/rtp_rtcp/source/rtcp_packet/nack.h"
#include "modules/rtp_rtcp/source/rtcp_packet/pli.h"
#include "modules/rtp_rtcp/source/rtcp_packet/receiver_report.h"
#include "modules/rtp_rtcp/source/rtcp_packet/report_block.h"
#include "modules/rtp_rtcp/source/rtcp_packet/sdes.h"
#include "modules/rtp_rtcp/source/rtcp_packet/sender_report.h"
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
#include "rtc_base/checks.h"
#include "rtc_base/containers/flat_map.h"
#include "rtc_base/logging.h"
#include "rtc_base/numerics/divide_round.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
namespace {
struct SenderReportTimes {
Timestamp local_received_time;
NtpTime remote_sent_time;
};
std::function<void(rtc::ArrayView<const uint8_t>)> GetRtcpTransport(
const RtcpTransceiverConfig& config) {
if (config.rtcp_transport != nullptr) {
return config.rtcp_transport;
}
bool first = true;
std::string log_prefix = config.debug_id;
return [first,
log_prefix](rtc::ArrayView<const uint8_t> /* packet */) mutable {
if (first) {
RTC_LOG(LS_ERROR) << log_prefix << "Sending RTCP packets is disabled.";
first = false;
}
};
}
} // namespace
struct RtcpTransceiverImpl::RemoteSenderState {
uint8_t fir_sequence_number = 0;
std::optional<SenderReportTimes> last_received_sender_report;
std::vector<MediaReceiverRtcpObserver*> observers;
};
struct RtcpTransceiverImpl::LocalSenderState {
uint32_t ssrc;
size_t last_num_sent_bytes = 0;
ReportBlockData report_block;
// Sequence number of the last FIR message per sender SSRC.
flat_map<uint32_t, uint8_t> last_fir;
RtpStreamRtcpHandler* handler = nullptr;
};
// Helper to put several RTCP packets into lower layer datagram composing
// Compound or Reduced-Size RTCP packet, as defined by RFC 5506 section 2.
// TODO(bugs.webrtc.org/8239): When in compound mode and packets are so many
// that several compound RTCP packets need to be generated, ensure each packet
// is compound.
class RtcpTransceiverImpl::PacketSender {
public:
PacketSender(rtcp::RtcpPacket::PacketReadyCallback callback,
size_t max_packet_size)
: callback_(callback), max_packet_size_(max_packet_size) {
RTC_CHECK_LE(max_packet_size, IP_PACKET_SIZE);
}
~PacketSender() { RTC_DCHECK_EQ(index_, 0) << "Unsent rtcp packet."; }
// Appends a packet to pending compound packet.
// Sends rtcp compound packet if buffer was already full and resets buffer.
void AppendPacket(const rtcp::RtcpPacket& packet) {
packet.Create(buffer_, &index_, max_packet_size_, callback_);
}
// Sends pending rtcp compound packet.
void Send() {
if (index_ > 0) {
callback_(rtc::ArrayView<const uint8_t>(buffer_, index_));
index_ = 0;
}
}
bool IsEmpty() const { return index_ == 0; }
private:
const rtcp::RtcpPacket::PacketReadyCallback callback_;
const size_t max_packet_size_;
size_t index_ = 0;
uint8_t buffer_[IP_PACKET_SIZE];
};
RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config)
: config_(config),
rtcp_transport_(GetRtcpTransport(config_)),
ready_to_send_(config.initial_ready_to_send) {
RTC_CHECK(config_.Validate());
if (ready_to_send_ && config_.schedule_periodic_compound_packets) {
SchedulePeriodicCompoundPackets(config_.initial_report_delay);
}
}
RtcpTransceiverImpl::~RtcpTransceiverImpl() = default;
void RtcpTransceiverImpl::AddMediaReceiverRtcpObserver(
uint32_t remote_ssrc,
MediaReceiverRtcpObserver* observer) {
if (config_.receive_statistics == nullptr && remote_senders_.empty()) {
RTC_LOG(LS_WARNING) << config_.debug_id
<< "receive statistic is not set. RTCP report blocks "
"will not be generated.";
}
auto& stored = remote_senders_[remote_ssrc].observers;
RTC_DCHECK(!absl::c_linear_search(stored, observer));
stored.push_back(observer);
}
void RtcpTransceiverImpl::RemoveMediaReceiverRtcpObserver(
uint32_t remote_ssrc,
MediaReceiverRtcpObserver* observer) {
auto remote_sender_it = remote_senders_.find(remote_ssrc);
if (remote_sender_it == remote_senders_.end())
return;
auto& stored = remote_sender_it->second.observers;
auto it = absl::c_find(stored, observer);
if (it == stored.end())
return;
stored.erase(it);
}
bool RtcpTransceiverImpl::AddMediaSender(uint32_t local_ssrc,
RtpStreamRtcpHandler* handler) {
RTC_DCHECK(handler != nullptr);
LocalSenderState state;
state.ssrc = local_ssrc;
state.handler = handler;
local_senders_.push_back(state);
auto it = std::prev(local_senders_.end());
auto [unused, inserted] = local_senders_by_ssrc_.emplace(local_ssrc, it);
if (!inserted) {
local_senders_.pop_back();
return false;
}
return true;
}
bool RtcpTransceiverImpl::RemoveMediaSender(uint32_t local_ssrc) {
auto index_it = local_senders_by_ssrc_.find(local_ssrc);
if (index_it == local_senders_by_ssrc_.end()) {
return false;
}
local_senders_.erase(index_it->second);
local_senders_by_ssrc_.erase(index_it);
return true;
}
void RtcpTransceiverImpl::SetReadyToSend(bool ready) {
if (config_.schedule_periodic_compound_packets) {
if (ready_to_send_ && !ready)
periodic_task_handle_.Stop();
if (!ready_to_send_ && ready) // Restart periodic sending.
SchedulePeriodicCompoundPackets(config_.report_period / 2);
}
ready_to_send_ = ready;
}
void RtcpTransceiverImpl::ReceivePacket(rtc::ArrayView<const uint8_t> packet,
Timestamp now) {
// Report blocks may be spread across multiple sender and receiver reports.
std::vector<ReportBlockData> report_blocks;
while (!packet.empty()) {
rtcp::CommonHeader rtcp_block;
if (!rtcp_block.Parse(packet.data(), packet.size()))
break;
HandleReceivedPacket(rtcp_block, now, report_blocks);
packet = packet.subview(rtcp_block.packet_size());
}
if (!report_blocks.empty()) {
ProcessReportBlocks(now, report_blocks);
}
}
void RtcpTransceiverImpl::SendCompoundPacket() {
if (!ready_to_send_)
return;
SendPeriodicCompoundPacket();
ReschedulePeriodicCompoundPackets();
}
void RtcpTransceiverImpl::SetRemb(int64_t bitrate_bps,
std::vector<uint32_t> ssrcs) {
RTC_DCHECK_GE(bitrate_bps, 0);
bool send_now = config_.send_remb_on_change &&
(!remb_.has_value() || bitrate_bps != remb_->bitrate_bps());
remb_.emplace();
remb_->SetSsrcs(std::move(ssrcs));
remb_->SetBitrateBps(bitrate_bps);
remb_->SetSenderSsrc(config_.feedback_ssrc);
// TODO(bugs.webrtc.org/8239): Move logic from PacketRouter for sending remb
// immideately on large bitrate change when there is one RtcpTransceiver per
// rtp transport.
if (send_now) {
std::optional<rtcp::Remb> remb;
remb.swap(remb_);
SendImmediateFeedback(*remb);
remb.swap(remb_);
}
}
void RtcpTransceiverImpl::UnsetRemb() {
remb_.reset();
}
void RtcpTransceiverImpl::SendNack(uint32_t ssrc,
std::vector<uint16_t> sequence_numbers) {
RTC_DCHECK(!sequence_numbers.empty());
if (!ready_to_send_)
return;
rtcp::Nack nack;
nack.SetSenderSsrc(config_.feedback_ssrc);
nack.SetMediaSsrc(ssrc);
nack.SetPacketIds(std::move(sequence_numbers));
SendImmediateFeedback(nack);
}
void RtcpTransceiverImpl::SendPictureLossIndication(uint32_t ssrc) {
if (!ready_to_send_)
return;
rtcp::Pli pli;
pli.SetSenderSsrc(config_.feedback_ssrc);
pli.SetMediaSsrc(ssrc);
SendImmediateFeedback(pli);
}
void RtcpTransceiverImpl::SendFullIntraRequest(
rtc::ArrayView<const uint32_t> ssrcs,
bool new_request) {
RTC_DCHECK(!ssrcs.empty());
if (!ready_to_send_)
return;
rtcp::Fir fir;
fir.SetSenderSsrc(config_.feedback_ssrc);
for (uint32_t media_ssrc : ssrcs) {
uint8_t& command_seq_num = remote_senders_[media_ssrc].fir_sequence_number;
if (new_request)
command_seq_num += 1;
fir.AddRequestTo(media_ssrc, command_seq_num);
}
SendImmediateFeedback(fir);
}
void RtcpTransceiverImpl::HandleReceivedPacket(
const rtcp::CommonHeader& rtcp_packet_header,
Timestamp now,
std::vector<ReportBlockData>& report_blocks) {
switch (rtcp_packet_header.type()) {
case rtcp::Bye::kPacketType:
HandleBye(rtcp_packet_header);
break;
case rtcp::SenderReport::kPacketType:
HandleSenderReport(rtcp_packet_header, now, report_blocks);
break;
case rtcp::ReceiverReport::kPacketType:
HandleReceiverReport(rtcp_packet_header, now, report_blocks);
break;
case rtcp::ExtendedReports::kPacketType:
HandleExtendedReports(rtcp_packet_header, now);
break;
case rtcp::Psfb::kPacketType:
HandlePayloadSpecificFeedback(rtcp_packet_header, now);
break;
case rtcp::Rtpfb::kPacketType:
HandleRtpFeedback(rtcp_packet_header, now);
break;
}
}
void RtcpTransceiverImpl::HandleBye(
const rtcp::CommonHeader& rtcp_packet_header) {
rtcp::Bye bye;
if (!bye.Parse(rtcp_packet_header))
return;
auto remote_sender_it = remote_senders_.find(bye.sender_ssrc());
if (remote_sender_it == remote_senders_.end())
return;
for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers)
observer->OnBye(bye.sender_ssrc());
}
void RtcpTransceiverImpl::HandleSenderReport(
const rtcp::CommonHeader& rtcp_packet_header,
Timestamp now,
std::vector<ReportBlockData>& report_blocks) {
rtcp::SenderReport sender_report;
if (!sender_report.Parse(rtcp_packet_header))
return;
RemoteSenderState& remote_sender =
remote_senders_[sender_report.sender_ssrc()];
remote_sender.last_received_sender_report = {{now, sender_report.ntp()}};
HandleReportBlocks(sender_report.sender_ssrc(), now,
sender_report.report_blocks(), report_blocks);
for (MediaReceiverRtcpObserver* observer : remote_sender.observers) {
observer->OnSenderReport(sender_report.sender_ssrc(), sender_report.ntp(),
sender_report.rtp_timestamp());
}
}
void RtcpTransceiverImpl::HandleReceiverReport(
const rtcp::CommonHeader& rtcp_packet_header,
Timestamp now,
std::vector<ReportBlockData>& report_blocks) {
rtcp::ReceiverReport receiver_report;
if (!receiver_report.Parse(rtcp_packet_header)) {
return;
}
HandleReportBlocks(receiver_report.sender_ssrc(), now,
receiver_report.report_blocks(), report_blocks);
}
void RtcpTransceiverImpl::HandleReportBlocks(
uint32_t sender_ssrc,
Timestamp now,
rtc::ArrayView<const rtcp::ReportBlock> rtcp_report_blocks,
std::vector<ReportBlockData>& report_blocks) {
if (rtcp_report_blocks.empty()) {
return;
}
NtpTime now_ntp = config_.clock->ConvertTimestampToNtpTime(now);
uint32_t receive_time_ntp = CompactNtp(now_ntp);
Timestamp now_utc = Clock::NtpToUtc(now_ntp);
for (const rtcp::ReportBlock& block : rtcp_report_blocks) {
std::optional<TimeDelta> rtt;
if (block.last_sr() != 0) {
rtt = CompactNtpRttToTimeDelta(
receive_time_ntp - block.delay_since_last_sr() - block.last_sr());
}
auto sender_it = local_senders_by_ssrc_.find(block.source_ssrc());
if (sender_it != local_senders_by_ssrc_.end()) {
LocalSenderState& state = *sender_it->second;
state.report_block.SetReportBlock(sender_ssrc, block, now_utc, now);
if (rtt.has_value()) {
state.report_block.AddRoundTripTimeSample(*rtt);
}
state.handler->OnReport(state.report_block);
report_blocks.push_back(state.report_block);
} else {
// No registered sender for this report block, still report it to the
// network link.
ReportBlockData report_block;
report_block.SetReportBlock(sender_ssrc, block, now_utc, now);
if (rtt.has_value()) {
report_block.AddRoundTripTimeSample(*rtt);
}
report_blocks.push_back(report_block);
}
}
}
void RtcpTransceiverImpl::HandlePayloadSpecificFeedback(
const rtcp::CommonHeader& rtcp_packet_header,
Timestamp now) {
switch (rtcp_packet_header.fmt()) {
case rtcp::Fir::kFeedbackMessageType:
HandleFir(rtcp_packet_header);
break;
case rtcp::Pli::kFeedbackMessageType:
HandlePli(rtcp_packet_header);
break;
case rtcp::Psfb::kAfbMessageType:
HandleRemb(rtcp_packet_header, now);
break;
}
}
void RtcpTransceiverImpl::HandleFir(
const rtcp::CommonHeader& rtcp_packet_header) {
rtcp::Fir fir;
if (local_senders_.empty() || !fir.Parse(rtcp_packet_header)) {
return;
}
for (const rtcp::Fir::Request& r : fir.requests()) {
auto it = local_senders_by_ssrc_.find(r.ssrc);
if (it == local_senders_by_ssrc_.end()) {
continue;
}
auto [fir_it, is_new] =
it->second->last_fir.emplace(fir.sender_ssrc(), r.seq_nr);
if (is_new || fir_it->second != r.seq_nr) {
it->second->handler->OnFir(fir.sender_ssrc());
fir_it->second = r.seq_nr;
}
}
}
void RtcpTransceiverImpl::HandlePli(
const rtcp::CommonHeader& rtcp_packet_header) {
rtcp::Pli pli;
if (local_senders_.empty() || !pli.Parse(rtcp_packet_header)) {
return;
}
auto it = local_senders_by_ssrc_.find(pli.media_ssrc());
if (it != local_senders_by_ssrc_.end()) {
it->second->handler->OnPli(pli.sender_ssrc());
}
}
void RtcpTransceiverImpl::HandleRemb(
const rtcp::CommonHeader& rtcp_packet_header,
Timestamp now) {
rtcp::Remb remb;
if (config_.network_link_observer == nullptr ||
!remb.Parse(rtcp_packet_header)) {
return;
}
config_.network_link_observer->OnReceiverEstimatedMaxBitrate(
now, DataRate::BitsPerSec(remb.bitrate_bps()));
}
void RtcpTransceiverImpl::HandleRtpFeedback(
const rtcp::CommonHeader& rtcp_packet_header,
Timestamp now) {
switch (rtcp_packet_header.fmt()) {
case rtcp::Nack::kFeedbackMessageType:
HandleNack(rtcp_packet_header);
break;
case rtcp::TransportFeedback::kFeedbackMessageType:
HandleTransportFeedback(rtcp_packet_header, now);
break;
}
}
void RtcpTransceiverImpl::HandleNack(
const rtcp::CommonHeader& rtcp_packet_header) {
rtcp::Nack nack;
if (local_senders_.empty() || !nack.Parse(rtcp_packet_header)) {
return;
}
auto it = local_senders_by_ssrc_.find(nack.media_ssrc());
if (it != local_senders_by_ssrc_.end()) {
it->second->handler->OnNack(nack.sender_ssrc(), nack.packet_ids());
}
}
void RtcpTransceiverImpl::HandleTransportFeedback(
const rtcp::CommonHeader& rtcp_packet_header,
Timestamp now) {
RTC_DCHECK_EQ(rtcp_packet_header.fmt(),
rtcp::TransportFeedback::kFeedbackMessageType);
if (config_.network_link_observer == nullptr) {
return;
}
rtcp::TransportFeedback feedback;
if (feedback.Parse(rtcp_packet_header)) {
config_.network_link_observer->OnTransportFeedback(now, feedback);
}
}
void RtcpTransceiverImpl::HandleExtendedReports(
const rtcp::CommonHeader& rtcp_packet_header,
Timestamp now) {
rtcp::ExtendedReports extended_reports;
if (!extended_reports.Parse(rtcp_packet_header))
return;
if (config_.reply_to_non_sender_rtt_measurement && extended_reports.rrtr()) {
RrtrTimes& rrtr = received_rrtrs_[extended_reports.sender_ssrc()];
rrtr.received_remote_mid_ntp_time =
CompactNtp(extended_reports.rrtr()->ntp());
rrtr.local_receive_mid_ntp_time =
CompactNtp(config_.clock->ConvertTimestampToNtpTime(now));
}
if (extended_reports.dlrr())
HandleDlrr(extended_reports.dlrr(), now);
if (extended_reports.target_bitrate())
HandleTargetBitrate(*extended_reports.target_bitrate(),
extended_reports.sender_ssrc());
}
void RtcpTransceiverImpl::HandleDlrr(const rtcp::Dlrr& dlrr, Timestamp now) {
if (!config_.non_sender_rtt_measurement ||
config_.network_link_observer == nullptr) {
return;
}
// Delay and last_rr are transferred using 32bit compact ntp resolution.
// Convert packet arrival time to same format through 64bit ntp format.
uint32_t receive_time_ntp =
CompactNtp(config_.clock->ConvertTimestampToNtpTime(now));
for (const rtcp::ReceiveTimeInfo& rti : dlrr.sub_blocks()) {
if (rti.ssrc != config_.feedback_ssrc)
continue;
uint32_t rtt_ntp = receive_time_ntp - rti.delay_since_last_rr - rti.last_rr;
TimeDelta rtt = CompactNtpRttToTimeDelta(rtt_ntp);
config_.network_link_observer->OnRttUpdate(now, rtt);
}
}
void RtcpTransceiverImpl::ProcessReportBlocks(
Timestamp now,
rtc::ArrayView<const ReportBlockData> report_blocks) {
RTC_DCHECK(!report_blocks.empty());
if (config_.network_link_observer == nullptr) {
return;
}
// Round trip time calculated from different report blocks suppose to be about
// the same, as those blocks should be generated by the same remote sender.
// To avoid too many callbacks, this code accumulate multiple rtts into one.
TimeDelta rtt_sum = TimeDelta::Zero();
size_t num_rtts = 0;
for (const ReportBlockData& report_block : report_blocks) {
if (report_block.has_rtt()) {
rtt_sum += report_block.last_rtt();
++num_rtts;
}
}
if (num_rtts > 0) {
config_.network_link_observer->OnRttUpdate(now, rtt_sum / num_rtts);
}
config_.network_link_observer->OnReport(now, report_blocks);
}
void RtcpTransceiverImpl::HandleTargetBitrate(
const rtcp::TargetBitrate& target_bitrate,
uint32_t remote_ssrc) {
auto remote_sender_it = remote_senders_.find(remote_ssrc);
if (remote_sender_it == remote_senders_.end() ||
remote_sender_it->second.observers.empty())
return;
// Convert rtcp::TargetBitrate to VideoBitrateAllocation.
VideoBitrateAllocation bitrate_allocation;
for (const rtcp::TargetBitrate::BitrateItem& item :
target_bitrate.GetTargetBitrates()) {
if (item.spatial_layer >= kMaxSpatialLayers ||
item.temporal_layer >= kMaxTemporalStreams) {
RTC_DLOG(LS_WARNING)
<< config_.debug_id
<< "Invalid incoming TargetBitrate with spatial layer "
<< item.spatial_layer << ", temporal layer " << item.temporal_layer;
continue;
}
bitrate_allocation.SetBitrate(item.spatial_layer, item.temporal_layer,
item.target_bitrate_kbps * 1000);
}
for (MediaReceiverRtcpObserver* observer : remote_sender_it->second.observers)
observer->OnBitrateAllocation(remote_ssrc, bitrate_allocation);
}
void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets() {
if (!config_.schedule_periodic_compound_packets)
return;
periodic_task_handle_.Stop();
RTC_DCHECK(ready_to_send_);
SchedulePeriodicCompoundPackets(config_.report_period);
}
void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(TimeDelta delay) {
periodic_task_handle_ = RepeatingTaskHandle::DelayedStart(
config_.task_queue, delay,
[this] {
RTC_DCHECK(config_.schedule_periodic_compound_packets);
RTC_DCHECK(ready_to_send_);
SendPeriodicCompoundPacket();
return config_.report_period;
},
TaskQueueBase::DelayPrecision::kLow, config_.clock);
}
std::vector<uint32_t> RtcpTransceiverImpl::FillReports(
Timestamp now,
ReservedBytes reserved,
PacketSender& rtcp_sender) {
// Sender/receiver reports should be first in the RTCP packet.
RTC_DCHECK(rtcp_sender.IsEmpty());
size_t available_bytes = config_.max_packet_size;
if (reserved.per_packet > available_bytes) {
// Because reserved.per_packet is unsigned, substracting would underflow and
// will not produce desired result.
available_bytes = 0;
} else {
available_bytes -= reserved.per_packet;
}
const size_t sender_report_size_bytes = 28 + reserved.per_sender;
const size_t full_sender_report_size_bytes =
sender_report_size_bytes +
rtcp::SenderReport::kMaxNumberOfReportBlocks * rtcp::ReportBlock::kLength;
size_t max_full_sender_reports =
available_bytes / full_sender_report_size_bytes;
size_t max_report_blocks =
max_full_sender_reports * rtcp::SenderReport::kMaxNumberOfReportBlocks;
size_t available_bytes_for_last_sender_report =
available_bytes - max_full_sender_reports * full_sender_report_size_bytes;
if (available_bytes_for_last_sender_report >= sender_report_size_bytes) {
max_report_blocks +=
(available_bytes_for_last_sender_report - sender_report_size_bytes) /
rtcp::ReportBlock::kLength;
}
std::vector<rtcp::ReportBlock> report_blocks =
CreateReportBlocks(now, max_report_blocks);
// Previous calculation of max number of sender report made space for max
// number of report blocks per sender report, but if number of report blocks
// is low, more sender reports may fit in.
size_t max_sender_reports =
(available_bytes - report_blocks.size() * rtcp::ReportBlock::kLength) /
sender_report_size_bytes;
auto last_handled_sender_it = local_senders_.end();
auto report_block_it = report_blocks.begin();
std::vector<uint32_t> sender_ssrcs;
for (auto it = local_senders_.begin();
it != local_senders_.end() && sender_ssrcs.size() < max_sender_reports;
++it) {
LocalSenderState& rtp_sender = *it;
RtpStreamRtcpHandler::RtpStats stats = rtp_sender.handler->SentStats();
if (stats.num_sent_bytes() < rtp_sender.last_num_sent_bytes) {
RTC_LOG(LS_ERROR) << "Inconsistent SR for SSRC " << rtp_sender.ssrc
<< ". Number of total sent bytes decreased.";
rtp_sender.last_num_sent_bytes = 0;
}
if (stats.num_sent_bytes() == rtp_sender.last_num_sent_bytes) {
// Skip because no RTP packet was send for this SSRC since last report.
continue;
}
rtp_sender.last_num_sent_bytes = stats.num_sent_bytes();
last_handled_sender_it = it;
rtcp::SenderReport sender_report;
sender_report.SetSenderSsrc(rtp_sender.ssrc);
sender_report.SetPacketCount(stats.num_sent_packets());
sender_report.SetOctetCount(stats.num_sent_bytes());
sender_report.SetNtp(config_.clock->ConvertTimestampToNtpTime(now));
RTC_DCHECK_GE(now, stats.last_capture_time());
sender_report.SetRtpTimestamp(
stats.last_rtp_timestamp() +
((now - stats.last_capture_time()) * stats.last_clock_rate())
.seconds());
if (report_block_it != report_blocks.end()) {
size_t num_blocks =
std::min<size_t>(rtcp::SenderReport::kMaxNumberOfReportBlocks,
report_blocks.end() - report_block_it);
std::vector<rtcp::ReportBlock> sub_blocks(report_block_it,
report_block_it + num_blocks);
sender_report.SetReportBlocks(std::move(sub_blocks));
report_block_it += num_blocks;
}
rtcp_sender.AppendPacket(sender_report);
sender_ssrcs.push_back(rtp_sender.ssrc);
}
if (last_handled_sender_it != local_senders_.end()) {
// Rotate `local_senders_` so that the 1st unhandled sender become first in
// the list, and thus will be first to generate rtcp sender report for on
// the next call to `FillReports`.
local_senders_.splice(local_senders_.end(), local_senders_,
local_senders_.begin(),
std::next(last_handled_sender_it));
}
// Calculcate number of receiver reports to attach remaining report blocks to.
size_t num_receiver_reports =
DivideRoundUp(report_blocks.end() - report_block_it,
rtcp::ReceiverReport::kMaxNumberOfReportBlocks);
// In compound mode each RTCP packet has to start with a sender or receiver
// report.
if (config_.rtcp_mode == RtcpMode::kCompound && sender_ssrcs.empty() &&
num_receiver_reports == 0) {
num_receiver_reports = 1;
}
uint32_t sender_ssrc =
sender_ssrcs.empty() ? config_.feedback_ssrc : sender_ssrcs.front();
for (size_t i = 0; i < num_receiver_reports; ++i) {
rtcp::ReceiverReport receiver_report;
receiver_report.SetSenderSsrc(sender_ssrc);
size_t num_blocks =
std::min<size_t>(rtcp::ReceiverReport::kMaxNumberOfReportBlocks,
report_blocks.end() - report_block_it);
std::vector<rtcp::ReportBlock> sub_blocks(report_block_it,
report_block_it + num_blocks);
receiver_report.SetReportBlocks(std::move(sub_blocks));
report_block_it += num_blocks;
rtcp_sender.AppendPacket(receiver_report);
}
// All report blocks should be attached at this point.
RTC_DCHECK_EQ(report_blocks.end() - report_block_it, 0);
return sender_ssrcs;
}
void RtcpTransceiverImpl::CreateCompoundPacket(Timestamp now,
size_t reserved_bytes,
PacketSender& sender) {
RTC_DCHECK(sender.IsEmpty());
ReservedBytes reserved = {.per_packet = reserved_bytes};
std::optional<rtcp::Sdes> sdes;
if (!config_.cname.empty()) {
sdes.emplace();
bool added = sdes->AddCName(config_.feedback_ssrc, config_.cname);
RTC_DCHECK(added) << "Failed to add CNAME " << config_.cname
<< " to RTCP SDES packet.";
reserved.per_packet += sdes->BlockLength();
}
if (remb_.has_value()) {
reserved.per_packet += remb_->BlockLength();
}
std::optional<rtcp::ExtendedReports> xr_with_dlrr;
if (!received_rrtrs_.empty()) {
RTC_DCHECK(config_.reply_to_non_sender_rtt_measurement);
xr_with_dlrr.emplace();
uint32_t now_ntp =
CompactNtp(config_.clock->ConvertTimestampToNtpTime(now));
for (const auto& [ssrc, rrtr_info] : received_rrtrs_) {
rtcp::ReceiveTimeInfo reply;
reply.ssrc = ssrc;
reply.last_rr = rrtr_info.received_remote_mid_ntp_time;
reply.delay_since_last_rr =
now_ntp - rrtr_info.local_receive_mid_ntp_time;
xr_with_dlrr->AddDlrrItem(reply);
}
if (config_.reply_to_non_sender_rtt_mesaurments_on_all_ssrcs) {
reserved.per_sender += xr_with_dlrr->BlockLength();
} else {
reserved.per_packet += xr_with_dlrr->BlockLength();
}
}
if (config_.non_sender_rtt_measurement) {
// It looks like bytes for ExtendedReport header are reserved twice, but in
// practice the same RtcpTransceiver won't both produce RRTR (i.e. it is a
// receiver-only) and reply to RRTR (i.e. remote participant is a receiver
// only). If that happen, then `reserved_bytes` would be slightly larger
// than it should, which is not an issue.
// 4 bytes for common RTCP header + 4 bytes for the ExtenedReports header.
reserved.per_packet += (4 + 4 + rtcp::Rrtr::kLength);
}
std::vector<uint32_t> sender_ssrcs = FillReports(now, reserved, sender);
bool has_sender_report = !sender_ssrcs.empty();
uint32_t sender_ssrc =
has_sender_report ? sender_ssrcs.front() : config_.feedback_ssrc;
if (sdes.has_value() && !sender.IsEmpty()) {
sender.AppendPacket(*sdes);
}
if (remb_.has_value()) {
remb_->SetSenderSsrc(sender_ssrc);
sender.AppendPacket(*remb_);
}
if (!has_sender_report && config_.non_sender_rtt_measurement) {
rtcp::ExtendedReports xr_with_rrtr;
xr_with_rrtr.SetSenderSsrc(config_.feedback_ssrc);
rtcp::Rrtr rrtr;
rrtr.SetNtp(config_.clock->ConvertTimestampToNtpTime(now));
xr_with_rrtr.SetRrtr(rrtr);
sender.AppendPacket(xr_with_rrtr);
}
if (xr_with_dlrr.has_value()) {
rtc::ArrayView<const uint32_t> ssrcs(&sender_ssrc, 1);
if (config_.reply_to_non_sender_rtt_mesaurments_on_all_ssrcs &&
!sender_ssrcs.empty()) {
ssrcs = sender_ssrcs;
}
RTC_DCHECK(!ssrcs.empty());
for (uint32_t ssrc : ssrcs) {
xr_with_dlrr->SetSenderSsrc(ssrc);
sender.AppendPacket(*xr_with_dlrr);
}
}
}
void RtcpTransceiverImpl::SendPeriodicCompoundPacket() {
Timestamp now = config_.clock->CurrentTime();
PacketSender sender(rtcp_transport_, config_.max_packet_size);
CreateCompoundPacket(now, /*reserved_bytes=*/0, sender);
sender.Send();
}
void RtcpTransceiverImpl::SendCombinedRtcpPacket(
std::vector<std::unique_ptr<rtcp::RtcpPacket>> rtcp_packets) {
PacketSender sender(rtcp_transport_, config_.max_packet_size);
for (auto& rtcp_packet : rtcp_packets) {
rtcp_packet->SetSenderSsrc(config_.feedback_ssrc);
sender.AppendPacket(*rtcp_packet);
}
sender.Send();
}
void RtcpTransceiverImpl::SendImmediateFeedback(
const rtcp::RtcpPacket& rtcp_packet) {
PacketSender sender(rtcp_transport_, config_.max_packet_size);
// Compound mode requires every sent rtcp packet to be compound, i.e. start
// with a sender or receiver report.
if (config_.rtcp_mode == RtcpMode::kCompound) {
Timestamp now = config_.clock->CurrentTime();
CreateCompoundPacket(now, /*reserved_bytes=*/rtcp_packet.BlockLength(),
sender);
}
sender.AppendPacket(rtcp_packet);
sender.Send();
// If compound packet was sent, delay (reschedule) the periodic one.
if (config_.rtcp_mode == RtcpMode::kCompound)
ReschedulePeriodicCompoundPackets();
}
std::vector<rtcp::ReportBlock> RtcpTransceiverImpl::CreateReportBlocks(
Timestamp now,
size_t num_max_blocks) {
if (!config_.receive_statistics)
return {};
std::vector<rtcp::ReportBlock> report_blocks =
config_.receive_statistics->RtcpReportBlocks(num_max_blocks);
uint32_t last_sr = 0;
uint32_t last_delay = 0;
for (rtcp::ReportBlock& report_block : report_blocks) {
auto it = remote_senders_.find(report_block.source_ssrc());
if (it == remote_senders_.end() ||
!it->second.last_received_sender_report) {
continue;
}
const SenderReportTimes& last_sender_report =
*it->second.last_received_sender_report;
last_sr = CompactNtp(last_sender_report.remote_sent_time);
last_delay =
SaturatedToCompactNtp(now - last_sender_report.local_received_time);
report_block.SetLastSr(last_sr);
report_block.SetDelayLastSr(last_delay);
}
return report_blocks;
}
} // namespace webrtc