Introduce PacketReceiver::DeliverRtpPacket and PacketReceier::DeliverRtcpPacket

DeliverRtpPacket use a parsed RTP packet as argument where the RTP extensions are supposed to be known.
This method is implemented in webrt::Call and temporary used by the exising method  Call::DeliverRtp, but the idea is to instead avoid extra packet parsing by forwarding a RtpPacketReceived from RtpTransport::DemuxRtpPacket via  WebrtcVideoChannel::OnPacketReceived and WebrtcVoiceChannel.

DeliverRtcpPacket is also implemented in Call and is directly used in PeerConnection::InitializeRtcpCallback.

Bug: webrtc:14795, webrtc:7135
Change-Id: Ib6ffe8e1229ac07fa459ee2fc9a0af8455a23bac
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/290401
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39015}
diff --git a/call/call.cc b/call/call.cc
index 3b2283d..218505c 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -23,6 +23,7 @@
 #include "absl/functional/bind_front.h"
 #include "absl/strings/string_view.h"
 #include "absl/types/optional.h"
+#include "api/media_types.h"
 #include "api/rtc_event_log/rtc_event_log.h"
 #include "api/sequence_checker.h"
 #include "api/task_queue/pending_task_safety_flag.h"
@@ -33,6 +34,7 @@
 #include "call/adaptation/broadcast_resource_listener.h"
 #include "call/bitrate_allocator.h"
 #include "call/flexfec_receive_stream_impl.h"
+#include "call/packet_receiver.h"
 #include "call/receive_time_calculator.h"
 #include "call/rtp_stream_receiver_controller.h"
 #include "call/rtp_transport_controller_send.h"
@@ -244,6 +246,13 @@
                                rtc::CopyOnWriteBuffer packet,
                                int64_t packet_time_us) override;
 
+  void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override;
+
+  void DeliverRtpPacket(
+      MediaType media_type,
+      RtpPacketReceived packet,
+      OnUndemuxablePacketHandler undemuxable_packet_handler) override;
+
   void SignalChannelNetworkState(MediaType media, NetworkState state) override;
 
   void OnAudioTransportOverheadChanged(
@@ -1375,60 +1384,102 @@
   }
 }
 
-void Call::DeliverRtcp(MediaType media_type, rtc::CopyOnWriteBuffer packet) {
-  RTC_DCHECK_RUN_ON(network_thread_);
+void Call::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
+  RTC_DCHECK_RUN_ON(worker_thread_);
+  RTC_DCHECK(IsRtcpPacket(packet));
   TRACE_EVENT0("webrtc", "Call::DeliverRtcp");
 
-  // TODO(bugs.webrtc.org/11993): This DCHECK is here just to maintain the
-  // invariant that currently the only call path to this function is via
-  // `PeerConnection::InitializeRtcpCallback()`. DeliverRtp on the other hand
-  // gets called via the channel classes and
-  // WebRtc[Audio|Video]Channel's `OnPacketReceived`. We'll remove the
-  // PeerConnection involvement as well as
-  // `JsepTransportController::OnRtcpPacketReceived_n` and `rtcp_handler`
-  // and make sure that the flow of packets is consistent from the
-  // `RtpTransport` class, via the *Channel and *Engine classes and into Call.
-  // This way we'll also know more about the context of the packet.
-  RTC_DCHECK_EQ(media_type, MediaType::ANY);
+  receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size()));
+  bool rtcp_delivered = false;
+  for (VideoReceiveStream2* stream : video_receive_streams_) {
+    if (stream->DeliverRtcp(packet.cdata(), packet.size()))
+      rtcp_delivered = true;
+  }
 
-  // TODO(bugs.webrtc.org/11993): This should execute directly on the network
-  // thread.
-  worker_thread_->PostTask(
-      SafeTask(task_safety_.flag(), [this, packet = std::move(packet)]() {
-        RTC_DCHECK_RUN_ON(worker_thread_);
+  for (AudioReceiveStreamImpl* stream : audio_receive_streams_) {
+    stream->DeliverRtcp(packet.cdata(), packet.size());
+    rtcp_delivered = true;
+  }
 
-        receive_stats_.AddReceivedRtcpBytes(static_cast<int>(packet.size()));
-        bool rtcp_delivered = false;
-        for (VideoReceiveStream2* stream : video_receive_streams_) {
-          if (stream->DeliverRtcp(packet.cdata(), packet.size()))
-            rtcp_delivered = true;
-        }
+  for (VideoSendStream* stream : video_send_streams_) {
+    stream->DeliverRtcp(packet.cdata(), packet.size());
+    rtcp_delivered = true;
+  }
 
-        for (AudioReceiveStreamImpl* stream : audio_receive_streams_) {
-          stream->DeliverRtcp(packet.cdata(), packet.size());
-          rtcp_delivered = true;
-        }
+  for (auto& kv : audio_send_ssrcs_) {
+    kv.second->DeliverRtcp(packet.cdata(), packet.size());
+    rtcp_delivered = true;
+  }
 
-        for (VideoSendStream* stream : video_send_streams_) {
-          stream->DeliverRtcp(packet.cdata(), packet.size());
-          rtcp_delivered = true;
-        }
+  if (rtcp_delivered) {
+    event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>(packet));
+  }
+}
 
-        for (auto& kv : audio_send_ssrcs_) {
-          kv.second->DeliverRtcp(packet.cdata(), packet.size());
-          rtcp_delivered = true;
-        }
+void Call::DeliverRtpPacket(
+    MediaType media_type,
+    RtpPacketReceived packet,
+    OnUndemuxablePacketHandler undemuxable_packet_handler) {
+  RTC_DCHECK_RUN_ON(worker_thread_);
+  RTC_DCHECK(packet.arrival_time().IsFinite());
 
-        if (rtcp_delivered) {
-          event_log_->Log(std::make_unique<RtcEventRtcpPacketIncoming>(
-              rtc::MakeArrayView(packet.cdata(), packet.size())));
-        }
-      }));
+  if (receive_time_calculator_) {
+    int64_t packet_time_us = packet.arrival_time().us();
+    // Repair packet_time_us for clock resets by comparing a new read of
+    // the same clock (TimeUTCMicros) to a monotonic clock reading.
+    packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
+        packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds());
+    packet.set_arrival_time(Timestamp::Micros(packet_time_us));
+  }
+
+  // We might get RTP keep-alive packets in accordance with RFC6263 section 4.6.
+  // These are empty (zero length payload) RTP packets with an unsignaled
+  // payload type.
+  const bool is_keep_alive_packet = packet.payload_size() == 0;
+  RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
+             is_keep_alive_packet);
+  NotifyBweOfReceivedPacket(packet, media_type);
+
+  if (media_type != MediaType::AUDIO && media_type != MediaType::VIDEO) {
+    RTC_DCHECK(is_keep_alive_packet);
+    return;
+  }
+
+  RtpStreamReceiverController& receiver_controller =
+      media_type == MediaType::AUDIO ? audio_receiver_controller_
+                                     : video_receiver_controller_;
+
+  if (!receiver_controller.OnRtpPacket(packet)) {
+    // Demuxing failed.  Allow the caller to create a
+    // receive stream in order to handle unsignalled SSRCs and try again.
+    // Note that we dont want to call NotifyBweOfReceivedPacket twice per
+    // packet.
+    if (!undemuxable_packet_handler(packet)) {
+      return;
+    }
+    if (!receiver_controller.OnRtpPacket(packet)) {
+      RTC_LOG(LS_INFO) << "Failed to demux packet " << packet.Ssrc();
+      return;
+    }
+  }
+  event_log_->Log(std::make_unique<RtcEventRtpPacketIncoming>(packet));
+
+  // RateCounters expect input parameter as int, save it as int,
+  // instead of converting each time it is passed to RateCounter::Add below.
+  int length = static_cast<int>(packet.size());
+  if (media_type == MediaType::AUDIO) {
+    receive_stats_.AddReceivedAudioBytes(length, packet.arrival_time());
+  }
+  if (media_type == MediaType::VIDEO) {
+    receive_stats_.AddReceivedVideoBytes(length, packet.arrival_time());
+  }
 }
 
 PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
                                                 rtc::CopyOnWriteBuffer packet,
                                                 int64_t packet_time_us) {
+  // TODO(perkj, https://bugs.webrtc.org/7135): Deprecate this method and
+  // direcly use DeliverRtpPacket.
   TRACE_EVENT0("webrtc", "Call::DeliverRtp");
   RTC_DCHECK_NE(media_type, MediaType::ANY);
 
@@ -1437,52 +1488,24 @@
     return DELIVERY_PACKET_ERROR;
 
   if (packet_time_us != -1) {
-    if (receive_time_calculator_) {
-      // Repair packet_time_us for clock resets by comparing a new read of
-      // the same clock (TimeUTCMicros) to a monotonic clock reading.
-      packet_time_us = receive_time_calculator_->ReconcileReceiveTimes(
-          packet_time_us, rtc::TimeUTCMicros(), clock_->TimeInMicroseconds());
-    }
     parsed_packet.set_arrival_time(Timestamp::Micros(packet_time_us));
   } else {
     parsed_packet.set_arrival_time(clock_->CurrentTime());
   }
 
-  // We might get RTP keep-alive packets in accordance with RFC6263 section 4.6.
-  // These are empty (zero length payload) RTP packets with an unsignaled
-  // payload type.
-  const bool is_keep_alive_packet = parsed_packet.payload_size() == 0;
-
-  RTC_DCHECK(media_type == MediaType::AUDIO || media_type == MediaType::VIDEO ||
-             is_keep_alive_packet);
-
   if (!IdentifyReceivedPacket(parsed_packet))
     return DELIVERY_UNKNOWN_SSRC;
-
-  NotifyBweOfReceivedPacket(parsed_packet, media_type);
-
-  // RateCounters expect input parameter as int, save it as int,
-  // instead of converting each time it is passed to RateCounter::Add below.
-  int length = static_cast<int>(parsed_packet.size());
-  if (media_type == MediaType::AUDIO) {
-    if (audio_receiver_controller_.OnRtpPacket(parsed_packet)) {
-      receive_stats_.AddReceivedAudioBytes(length,
-                                           parsed_packet.arrival_time());
-      event_log_->Log(
-          std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
-      return DELIVERY_OK;
-    }
-  } else if (media_type == MediaType::VIDEO) {
+  if (media_type == MediaType::VIDEO) {
     parsed_packet.set_payload_type_frequency(kVideoPayloadTypeFrequency);
-    if (video_receiver_controller_.OnRtpPacket(parsed_packet)) {
-      receive_stats_.AddReceivedVideoBytes(length,
-                                           parsed_packet.arrival_time());
-      event_log_->Log(
-          std::make_unique<RtcEventRtpPacketIncoming>(parsed_packet));
-      return DELIVERY_OK;
-    }
   }
-  return DELIVERY_UNKNOWN_SSRC;
+  DeliverRtpPacket(media_type, std::move(parsed_packet),
+                   [](const webrtc::RtpPacketReceived& packet) {
+                     // If IdentifyReceivedPacket returns true, a packet is
+                     // expected to be demuxable.
+                     RTC_DCHECK_NOTREACHED();
+                     return false;
+                   });
+  return DELIVERY_OK;
 }
 
 PacketReceiver::DeliveryStatus Call::DeliverPacket(
@@ -1491,7 +1514,11 @@
     int64_t packet_time_us) {
   if (IsRtcpPacket(packet)) {
     RTC_DCHECK_RUN_ON(network_thread_);
-    DeliverRtcp(media_type, std::move(packet));
+    worker_thread_->PostTask(SafeTask(
+        task_safety_.flag(), [this, packet = std::move(packet)]() mutable {
+          RTC_DCHECK_RUN_ON(worker_thread_);
+          DeliverRtcpPacket(std::move(packet));
+        }));
     return DELIVERY_OK;
   }
 
diff --git a/call/degraded_call.cc b/call/degraded_call.cc
index c59a63b..445a1e9 100644
--- a/call/degraded_call.cc
+++ b/call/degraded_call.cc
@@ -414,6 +414,11 @@
   return status;
 }
 
+void DegradedCall::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
+  receive_pipe_->DeliverRtcpPacket(std::move(packet));
+  receive_pipe_->Process();
+}
+
 void DegradedCall::SetClientBitratePreferences(
     const webrtc::BitrateSettings& preferences) {
   call_->SetClientBitratePreferences(preferences);
diff --git a/call/degraded_call.h b/call/degraded_call.h
index 5906e55..bff93e2 100644
--- a/call/degraded_call.h
+++ b/call/degraded_call.h
@@ -116,6 +116,7 @@
   DeliveryStatus DeliverPacket(MediaType media_type,
                                rtc::CopyOnWriteBuffer packet,
                                int64_t packet_time_us) override;
+  void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override;
 
  private:
   class FakeNetworkPipeOnTaskQueue {
diff --git a/call/fake_network_pipe.cc b/call/fake_network_pipe.cc
index 8a03e0c..6065064 100644
--- a/call/fake_network_pipe.cc
+++ b/call/fake_network_pipe.cc
@@ -184,6 +184,11 @@
              : PacketReceiver::DELIVERY_PACKET_ERROR;
 }
 
+void FakeNetworkPipe::DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
+  EnqueuePacket(std::move(packet), absl::nullopt, true, MediaType::ANY,
+                absl::nullopt);
+}
+
 void FakeNetworkPipe::SetClockOffset(int64_t offset_ms) {
   MutexLock lock(&config_lock_);
   clock_offset_ms_ = offset_ms;
diff --git a/call/fake_network_pipe.h b/call/fake_network_pipe.h
index be72e91..3a6cb05 100644
--- a/call/fake_network_pipe.h
+++ b/call/fake_network_pipe.h
@@ -149,6 +149,8 @@
                                                rtc::CopyOnWriteBuffer packet,
                                                int64_t packet_time_us) override;
 
+  void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) override;
+
   // TODO(bugs.webrtc.org/9584): Needed to inherit the alternative signature for
   // this method.
   using PacketReceiver::DeliverPacket;
diff --git a/call/packet_receiver.h b/call/packet_receiver.h
index 13d3b84..a97bb96 100644
--- a/call/packet_receiver.h
+++ b/call/packet_receiver.h
@@ -10,7 +10,10 @@
 #ifndef CALL_PACKET_RECEIVER_H_
 #define CALL_PACKET_RECEIVER_H_
 
+#include "absl/functional/any_invocable.h"
 #include "api/media_types.h"
+#include "modules/rtp_rtcp/source/rtp_packet_received.h"
+#include "rtc_base/checks.h"
 #include "rtc_base/copy_on_write_buffer.h"
 
 namespace webrtc {
@@ -27,6 +30,28 @@
                                        rtc::CopyOnWriteBuffer packet,
                                        int64_t packet_time_us) = 0;
 
+  // Demux RTCP packets. Must be called on the worker thread.
+  virtual void DeliverRtcpPacket(rtc::CopyOnWriteBuffer packet) {
+    // TODO(perkj, https://bugs.webrtc.org/7135): Implement in FakeCall and
+    // FakeNetworkPipe.
+    RTC_CHECK_NOTREACHED();
+  }
+
+  // Invoked once when a packet packet is received that can not be demuxed.
+  // If the method returns true, a new attempt is made to demux the packet.
+  using OnUndemuxablePacketHandler =
+      absl::AnyInvocable<bool(const RtpPacketReceived& parsed_packet)>;
+
+  // Demux RTP packets. Must be called on the worker thread.
+  virtual void DeliverRtpPacket(
+      MediaType media_type,
+      RtpPacketReceived packet,
+      OnUndemuxablePacketHandler undemuxable_packet_handler) {
+    // TODO(perkj, https://bugs.webrtc.org/7135): Implement in FakeCall and
+    // FakeNetworkPipe.
+    RTC_CHECK_NOTREACHED();
+  }
+
  protected:
   virtual ~PacketReceiver() {}
 };
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index 5de77fe..622870c 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -2978,10 +2978,11 @@
                    int64_t packet_time_us)>
 PeerConnection::InitializeRtcpCallback() {
   RTC_DCHECK_RUN_ON(network_thread());
-  return [this](const rtc::CopyOnWriteBuffer& packet, int64_t packet_time_us) {
-    RTC_DCHECK_RUN_ON(network_thread());
-    call_ptr_->Receiver()->DeliverPacket(MediaType::ANY, packet,
-                                         packet_time_us);
+  return [this](const rtc::CopyOnWriteBuffer& packet,
+                int64_t /*packet_time_us*/) {
+    worker_thread()->PostTask(SafeTask(worker_thread_safety_, [this, packet]() {
+      call_ptr_->Receiver()->DeliverRtcpPacket(packet);
+    }));
   };
 }