Introduce PacketReceiver::DeliverRtpPacket and PacketReceier::DeliverRtcpPacket
DeliverRtpPacket use a parsed RTP packet as argument where the RTP extensions are supposed to be known.
This method is implemented in webrt::Call and temporary used by the exising method Call::DeliverRtp, but the idea is to instead avoid extra packet parsing by forwarding a RtpPacketReceived from RtpTransport::DemuxRtpPacket via WebrtcVideoChannel::OnPacketReceived and WebrtcVoiceChannel.
DeliverRtcpPacket is also implemented in Call and is directly used in PeerConnection::InitializeRtcpCallback.
Bug: webrtc:14795, webrtc:7135
Change-Id: Ib6ffe8e1229ac07fa459ee2fc9a0af8455a23bac
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/290401
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39015}
diff --git a/call/call.cc b/call/call.cc
index 3b2283d..218505c 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -23,6 +23,7 @@
#include "absl/functional/bind_front.h"
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
+#include "api/media_types.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/sequence_checker.h"
#include "api/task_queue/pending_task_safety_flag.h"
@@ -33,6 +34,7 @@
#include "call/adaptation/broadcast_resource_listener.h"
#include "call/bitrate_allocator.h"
#include "call/flexfec_receive_stream_impl.h"
+#include "call/packet_receiver.h"
#include "call/receive_time_calculator.h"
#include "call/rtp_stream_receiver_controller.h"
#include "call/rtp_transport_controller_send.h"
@@ -244,6 +246,13 @@
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
+ void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override;
+
+ void DeliverRtpPacket(
+ MediaType media_type,
+ RtpPacketReceived packet,
+ OnUndemuxablePacketHandler undemuxable_packet_handler) override;
+
void SignalChannelNetworkState(MediaType media, NetworkState state) override;
void OnAudioTransportOverheadChanged(
@@ -1375,60 +1384,102 @@
}
}
-void Call::DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) {
- RTC_DCHECK_RUN_ON(network_thread_);
+void Call::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ RTC_DCHECK(IsRtcpPacket(packet));
TRACE_EVENT0("webrtc", "Call::DeliverRtcp");
- // TODO(bugs.webrtc.org/11993): This DCHECK is here just to maintain the
- // invariant that currently the only call path to this function is via
- // `PeerConnection::InitializeRtcpCallback()`. DeliverRtp on the other hand
- // gets called via the channel classes and
- // WebRtc[Audio|Video]Channel's `OnPacketReceived`. We'll remove the
- // PeerConnection involvement as well as
- // `JsepTransportController::OnRtcpPacketReceived_n` and `rtcp_handler`
- // and make sure that the flow of packets is consistent from the
- // `RtpTransport` class, via the *Channel and *Engine classes and into Call.
- // This way we'll also know more about the context of the packet.
- RTC_DCHECK_EQ(media_type, MediaType::ANY);
+ receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size()));
+ bool rtcp_delivered = false;
+ for (VideoReceiveStream2* stream : video_receive_streams_) {
+ if (stream->DeliverRtcp(packet.cdata(), packet.size()))
+ rtcp_delivered = true;
+ }
- // TODO(bugs.webrtc.org/11993): This should execute directly on the network
- // thread.
- worker_thread_->PostTask(
- SafeTask(task_safety_.flag(), [this, packet = std::move(packet)]() {
- RTC_DCHECK_RUN_ON(worker_thread_);
+ for (AudioReceiveStreamImpl* stream : audio_receive_streams_) {
+ stream->DeliverRtcp(packet.cdata(), packet.size());
+ rtcp_delivered = true;
+ }
- receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size()));
- bool rtcp_delivered = false;
- for (VideoReceiveStream2* stream : video_receive_streams_) {
- if (stream->DeliverRtcp(packet.cdata(), packet.size()))
- rtcp_delivered = true;
- }
+ for (VideoSendStream* stream : video_send_streams_) {
+ stream->DeliverRtcp(packet.cdata(), packet.size());
+ rtcp_delivered = true;
+ }
- for (AudioReceiveStreamImpl* stream : audio_receive_streams_) {
- stream->DeliverRtcp(packet.cdata(), packet.size());
- rtcp_delivered = true;
- }
+ for (auto& kv : audio_send_ssrcs_) {
+ kv.second->DeliverRtcp(packet.cdata(), packet.size());
+ rtcp_delivered = true;
+ }
- for (VideoSendStream* stream : video_send_streams_) {
- stream->DeliverRtcp(packet.cdata(), packet.size());
- rtcp_delivered = true;
- }
+ if (rtcp_delivered) {
+ event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>(packet));
+ }
+}
- for (auto& kv : audio_send_ssrcs_) {
- kv.second->DeliverRtcp(packet.cdata(), packet.size());
- rtcp_delivered = true;
- }
+void Call::DeliverRtpPacket(
+ MediaType media_type,
+ RtpPacketReceived packet,
+ OnUndemuxablePacketHandler undemuxable_packet_handler) {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ RTC_DCHECK(packet.arrival_time().IsFinite());
- if (rtcp_delivered) {
- event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>(
- rtc::MakeArrayView(packet.cdata(), packet.size())));
- }
- }));
+ if (receive_time_calculator_) {
+ int64_t packet_time_us = packet.arrival_time().us();
+ // Repair packet_time_us for clock resets by comparing a new read of
+ // the same clock (TimeUTCMicros) to a monotonic clock reading.
+ packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
+ packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds());
+ packet.set_arrival_time(Timestamp::Micros(packet_time_us));
+ }
+
+ // We might get RTP keep-alive packets in accordance with RFC6263 section 4.6.
+ // These are empty (zero length payload) RTP packets with an unsignaled
+ // payload type.
+ const bool is_keep_alive_packet = packet.payload_size() == 0;
+ RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
+ is_keep_alive_packet);
+ NotifyBweOfReceivedPacket(packet, media_type);
+
+ if (media_type != MediaType::AUDIO && media_type != MediaType::VIDEO) {
+ RTC_DCHECK(is_keep_alive_packet);
+ return;
+ }
+
+ RtpStreamReceiverController& receiver_controller =
+ media_type == MediaType::AUDIO ? audio_receiver_controller_
+ : video_receiver_controller_;
+
+ if (!receiver_controller.OnRtpPacket(packet)) {
+ // Demuxing failed. Allow the caller to create a
+ // receive stream in order to handle unsignalled SSRCs and try again.
+ // Note that we dont want to call NotifyBweOfReceivedPacket twice per
+ // packet.
+ if (!undemuxable_packet_handler(packet)) {
+ return;
+ }
+ if (!receiver_controller.OnRtpPacket(packet)) {
+ RTC_LOG(LS_INFO) << "Failed to demux packet " << packet.Ssrc();
+ return;
+ }
+ }
+ event_log_->Log(std::make_unique<RtcEventRtpPacketIncoming>(packet));
+
+ // RateCounters expect input parameter as int, save it as int,
+ // instead of converting each time it is passed to RateCounter::Add below.
+ int length = static_cast<int>(packet.size());
+ if (media_type == MediaType::AUDIO) {
+ receive_stats_.AddReceivedAudioBytes(length, packet.arrival_time());
+ }
+ if (media_type == MediaType::VIDEO) {
+ receive_stats_.AddReceivedVideoBytes(length, packet.arrival_time());
+ }
}
PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) {
+ // TODO(perkj, https://bugs.webrtc.org/7135): Deprecate this method and
+ // direcly use DeliverRtpPacket.
TRACE_EVENT0("webrtc", "Call::DeliverRtp");
RTC_DCHECK_NE(media_type, MediaType::ANY);
@@ -1437,52 +1488,24 @@
return DELIVERY_PACKET_ERROR;
if (packet_time_us != -1) {
- if (receive_time_calculator_) {
- // Repair packet_time_us for clock resets by comparing a new read of
- // the same clock (TimeUTCMicros) to a monotonic clock reading.
- packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
- packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds());
- }
parsed_packet.set_arrival_time(Timestamp::Micros(packet_time_us));
} else {
parsed_packet.set_arrival_time(clock_->CurrentTime());
}
- // We might get RTP keep-alive packets in accordance with RFC6263 section 4.6.
- // These are empty (zero length payload) RTP packets with an unsignaled
- // payload type.
- const bool is_keep_alive_packet = parsed_packet.payload_size() == 0;
-
- RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
- is_keep_alive_packet);
-
if (!IdentifyReceivedPacket(parsed_packet))
return DELIVERY_UNKNOWN_SSRC;
-
- NotifyBweOfReceivedPacket(parsed_packet, media_type);
-
- // RateCounters expect input parameter as int, save it as int,
- // instead of converting each time it is passed to RateCounter::Add below.
- int length = static_cast<int>(parsed_packet.size());
- if (media_type == MediaType::AUDIO) {
- if (audio_receiver_controller_.OnRtpPacket(parsed_packet)) {
- receive_stats_.AddReceivedAudioBytes(length,
- parsed_packet.arrival_time());
- event_log_->Log(
- std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
- return DELIVERY_OK;
- }
- } else if (media_type == MediaType::VIDEO) {
+ if (media_type == MediaType::VIDEO) {
parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
- if (video_receiver_controller_.OnRtpPacket(parsed_packet)) {
- receive_stats_.AddReceivedVideoBytes(length,
- parsed_packet.arrival_time());
- event_log_->Log(
- std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
- return DELIVERY_OK;
- }
}
- return DELIVERY_UNKNOWN_SSRC;
+ DeliverRtpPacket(media_type, std::move(parsed_packet),
+ [](const webrtc::RtpPacketReceived& packet) {
+ // If IdentifyReceivedPacket returns true, a packet is
+ // expected to be demuxable.
+ RTC_DCHECK_NOTREACHED();
+ return false;
+ });
+ return DELIVERY_OK;
}
PacketReceiver::DeliveryStatus Call::DeliverPacket(
@@ -1491,7 +1514,11 @@
int64_t packet_time_us) {
if (IsRtcpPacket(packet)) {
RTC_DCHECK_RUN_ON(network_thread_);
- DeliverRtcp(media_type, std::move(packet));
+ worker_thread_->PostTask(SafeTask(
+ task_safety_.flag(), [this, packet = std::move(packet)]() mutable {
+ RTC_DCHECK_RUN_ON(worker_thread_);
+ DeliverRtcpPacket(std::move(packet));
+ }));
return DELIVERY_OK;
}
diff --git a/call/degraded_call.cc b/call/degraded_call.cc
index c59a63b..445a1e9 100644
--- a/call/degraded_call.cc
+++ b/call/degraded_call.cc
@@ -414,6 +414,11 @@
return status;
}
+void DegradedCall::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
+ receive_pipe_->DeliverRtcpPacket(std::move(packet));
+ receive_pipe_->Process();
+}
+
void DegradedCall::SetClientBitratePreferences(
const webrtc::BitrateSettings& preferences) {
call_->SetClientBitratePreferences(preferences);
diff --git a/call/degraded_call.h b/call/degraded_call.h
index 5906e55..bff93e2 100644
--- a/call/degraded_call.h
+++ b/call/degraded_call.h
@@ -116,6 +116,7 @@
DeliveryStatus DeliverPacket(MediaType media_type,
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
+ void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override;
private:
class FakeNetworkPipeOnTaskQueue {
diff --git a/call/fake_network_pipe.cc b/call/fake_network_pipe.cc
index 8a03e0c..6065064 100644
--- a/call/fake_network_pipe.cc
+++ b/call/fake_network_pipe.cc
@@ -184,6 +184,11 @@
: PacketReceiver::DELIVERY_PACKET_ERROR;
}
+void FakeNetworkPipe::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
+ EnqueuePacket(std::move(packet), absl::nullopt, true, MediaType::ANY,
+ absl::nullopt);
+}
+
void FakeNetworkPipe::SetClockOffset(int64_t offset_ms) {
MutexLock lock(&config_lock_);
clock_offset_ms_ = offset_ms;
diff --git a/call/fake_network_pipe.h b/call/fake_network_pipe.h
index be72e91..3a6cb05 100644
--- a/call/fake_network_pipe.h
+++ b/call/fake_network_pipe.h
@@ -149,6 +149,8 @@
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) override;
+ void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override;
+
// TODO(bugs.webrtc.org/9584): Needed to inherit the alternative signature for
// this method.
using PacketReceiver::DeliverPacket;
diff --git a/call/packet_receiver.h b/call/packet_receiver.h
index 13d3b84..a97bb96 100644
--- a/call/packet_receiver.h
+++ b/call/packet_receiver.h
@@ -10,7 +10,10 @@
#ifndef CALL_PACKET_RECEIVER_H_
#define CALL_PACKET_RECEIVER_H_
+#include "absl/functional/any_invocable.h"
#include "api/media_types.h"
+#include "modules/rtp_rtcp/source/rtp_packet_received.h"
+#include "rtc_base/checks.h"
#include "rtc_base/copy_on_write_buffer.h"
namespace webrtc {
@@ -27,6 +30,28 @@
rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) = 0;
+ // Demux RTCP packets. Must be called on the worker thread.
+ virtual void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
+ // TODO(perkj, https://bugs.webrtc.org/7135): Implement in FakeCall and
+ // FakeNetworkPipe.
+ RTC_CHECK_NOTREACHED();
+ }
+
+ // Invoked once when a packet packet is received that can not be demuxed.
+ // If the method returns true, a new attempt is made to demux the packet.
+ using OnUndemuxablePacketHandler =
+ absl::AnyInvocable<bool(const RtpPacketReceived& parsed_packet)>;
+
+ // Demux RTP packets. Must be called on the worker thread.
+ virtual void DeliverRtpPacket(
+ MediaType media_type,
+ RtpPacketReceived packet,
+ OnUndemuxablePacketHandler undemuxable_packet_handler) {
+ // TODO(perkj, https://bugs.webrtc.org/7135): Implement in FakeCall and
+ // FakeNetworkPipe.
+ RTC_CHECK_NOTREACHED();
+ }
+
protected:
virtual ~PacketReceiver() {}
};
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index 5de77fe..622870c 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -2978,10 +2978,11 @@
int64_t packet_time_us)>
PeerConnection::InitializeRtcpCallback() {
RTC_DCHECK_RUN_ON(network_thread());
- return [this](const rtc::CopyOnWriteBuffer& packet, int64_t packet_time_us) {
- RTC_DCHECK_RUN_ON(network_thread());
- call_ptr_->Receiver()->DeliverPacket(MediaType::ANY, packet,
- packet_time_us);
+ return [this](const rtc::CopyOnWriteBuffer& packet,
+ int64_t /*packet_time_us*/) {
+ worker_thread()->PostTask(SafeTask(worker_thread_safety_, [this, packet]() {
+ call_ptr_->Receiver()->DeliverRtcpPacket(packet);
+ }));
};
}