Move payload type demuxing management to RtpTransport

Shift the responsibility for enabling and disabling payload type
demuxing from the Channel and Transceiver layers to the RtpTransport and
RtpDemuxer layer. This centralizes demuxing logic within the transport
layer.

Moving this logic to RtpTransport allows the SdpOfferAnswerHandler to
apply state changes directly to the transport and not need to go through
the Channel classes.

Key modifications:
- Remove SetPayloadTypeDemuxingEnabled from BaseChannel,
  ChannelInterface, and RtpTransceiver.
- Implement SetActivePayloadTypeDemuxing in RtpTransport to manage
  demuxer criteria filtering and coordinate with RtpDemuxer.
- Update SdpOfferAnswerHandler to apply demuxing state changes to
  RtpTransport instances directly.
- Ensure ResetUnsignaledRecvStream executes on the worker thread
  after applying transport state, to clear legacy streams when
  demuxing is disabled.

Bug: webrtc:42222117
Change-Id: If02194417dc8e4c3dcf5fda5e969aa81116004e1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/456720
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#47318}
diff --git a/call/rtp_demuxer.cc b/call/rtp_demuxer.cc
index c49d4fc..598764b 100644
--- a/call/rtp_demuxer.cc
+++ b/call/rtp_demuxer.cc
@@ -400,8 +400,13 @@
     return ssrc_sink_it->second;
   }
 
-  // Legacy senders will only signal payload type, support that as last resort.
-  return ResolveSinkByPayloadType(packet.PayloadType(), ssrc);
+  if (use_payload_type_demuxing_) {
+    // Legacy senders will only signal payload type.
+    // Support that as a last resort.
+    return ResolveSinkByPayloadType(packet.PayloadType(), ssrc);
+  }
+
+  return nullptr;
 }
 
 RtpPacketSinkInterface* RtpDemuxer::ResolveSinkByMid(absl::string_view mid,
diff --git a/call/rtp_demuxer.h b/call/rtp_demuxer.h
index acc985f..cc6a84c 100644
--- a/call/rtp_demuxer.h
+++ b/call/rtp_demuxer.h
@@ -136,6 +136,10 @@
   RtpDemuxer(const RtpDemuxer&) = delete;
   void operator=(const RtpDemuxer&) = delete;
 
+  void set_use_payload_type_demuxing(bool enable) {
+    use_payload_type_demuxing_ = enable;
+  }
+
   // Registers a sink that will be notified when RTP packets match its given
   // criteria according to the algorithm described in the class description.
   // Returns true if the sink was successfully added.
@@ -230,6 +234,7 @@
   void AddSsrcSinkBinding(uint32_t ssrc, RtpPacketSinkInterface* sink);
 
   const bool use_mid_;
+  bool use_payload_type_demuxing_ = true;
 };
 
 }  // namespace webrtc
diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc
index 172c7a1..0e6ce01 100644
--- a/media/engine/webrtc_video_engine.cc
+++ b/media/engine/webrtc_video_engine.cc
@@ -3143,6 +3143,11 @@
 }
 
 void WebRtcVideoReceiveChannel::ResetUnsignaledRecvStream() {
+  if (!worker_thread_->IsCurrent()) {
+    worker_thread_->PostTask(SafeTask(
+        task_safety_.flag(), [this]() { ResetUnsignaledRecvStream(); }));
+    return;
+  }
   RTC_DCHECK_RUN_ON(&thread_checker_);
   RTC_LOG(LS_INFO) << "ResetUnsignaledRecvStream.";
   unsignaled_stream_params_ = StreamParams();
diff --git a/media/engine/webrtc_video_engine.h b/media/engine/webrtc_video_engine.h
index d497e42..334a871 100644
--- a/media/engine/webrtc_video_engine.h
+++ b/media/engine/webrtc_video_engine.h
@@ -711,6 +711,7 @@
   // Variables.
   const Environment env_;
   TaskQueueBase* const worker_thread_;
+  ScopedTaskSafety task_safety_;
   scoped_refptr<PendingTaskSafetyFlag> network_thread_safety_;
   RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_{
       SequenceChecker::kDetached};
diff --git a/media/engine/webrtc_voice_engine.cc b/media/engine/webrtc_voice_engine.cc
index 0448da0..4000fb1 100644
--- a/media/engine/webrtc_voice_engine.cc
+++ b/media/engine/webrtc_voice_engine.cc
@@ -2495,6 +2495,11 @@
 }
 
 void WebRtcVoiceReceiveChannel::ResetUnsignaledRecvStream() {
+  if (!worker_thread_->IsCurrent()) {
+    worker_thread_->PostTask(SafeTask(
+        task_safety_.flag(), [this]() { ResetUnsignaledRecvStream(); }));
+    return;
+  }
   RTC_DCHECK_RUN_ON(worker_thread_);
   RTC_LOG(LS_INFO) << "ResetUnsignaledRecvStream.";
   unsignaled_stream_params_ = StreamParams();
diff --git a/pc/BUILD.gn b/pc/BUILD.gn
index 91cc293..322488a 100644
--- a/pc/BUILD.gn
+++ b/pc/BUILD.gn
@@ -617,6 +617,7 @@
     "../rtc_base/network:sent_packet",
     "../rtc_base/system:no_unique_address",
     "//third_party/abseil-cpp/absl/algorithm:container",
+    "//third_party/abseil-cpp/absl/container:flat_hash_map",
     "//third_party/abseil-cpp/absl/strings:string_view",
   ]
 }
@@ -1150,6 +1151,7 @@
     ":rtp_sender_proxy",
     ":rtp_transceiver",
     ":rtp_transmission_manager",
+    ":rtp_transport_internal",
     ":scoped_operations_batcher",
     ":sdp_munging_detector",
     ":sdp_payload_type_suggester",
diff --git a/pc/channel.cc b/pc/channel.cc
index 610f507..59921fa 100644
--- a/pc/channel.cc
+++ b/pc/channel.cc
@@ -338,42 +338,6 @@
   return SetRemoteContent_w(content, type);
 }
 
-bool BaseChannel::SetPayloadTypeDemuxingEnabled(bool enabled) {
-  // TODO(bugs.webrtc.org/11993): The demuxer state needs to be managed on the
-  // network thread. At the moment there's a workaround for inconsistent state
-  // between the worker and network thread because of this (see
-  // OnDemuxerCriteriaUpdatePending elsewhere in this file) and
-  // UpdateRemoteStreams_w has a BlockingCall over to the network
-  // thread to apply state updates.
-  RTC_DCHECK_RUN_ON(network_thread());
-  TRACE_EVENT0("webrtc", "BaseChannel::SetPayloadTypeDemuxingEnabled");
-
-  if (enabled == payload_type_demuxing_enabled_) {
-    return true;
-  }
-
-  payload_type_demuxing_enabled_ = enabled;
-
-  if (!enabled) {
-    worker_thread_->PostTask(SafeTask(alive_, [this] {
-      // TODO(crbug.com/11477): This will remove *all* unsignaled streams (those
-      // without an explicitly signaled SSRC), which may include streams that
-      // were matched to this channel by MID or RID. Ideally we'd remove only
-      // the streams that were matched based on payload type alone, but
-      // currently there is no straightforward way to identify those streams.
-      media_receive_channel()->ResetUnsignaledRecvStream();
-    }));
-  }
-
-  if (!payload_types_.empty()) {
-    if (!rtp_transport_) {
-      return false;
-    }
-    return rtp_transport_->RegisterRtpDemuxerSink(demuxer_criteria(), this);
-  }
-
-  return true;
-}
 
 bool BaseChannel::IsReadyToSendMedia_w() const {
   // Send outgoing data if we are enabled, have local and remote content,
@@ -661,9 +625,7 @@
 RtpDemuxerCriteria BaseChannel::demuxer_criteria() const {
   RTC_DCHECK_RUN_ON(network_thread());
   RtpDemuxerCriteria criteria(mid_);
-  if (payload_type_demuxing_enabled_) {
-    criteria.payload_types() = payload_types_;
-  }
+  criteria.payload_types() = payload_types_;
   criteria.ssrcs() = ssrcs_;
   return criteria;
 }
diff --git a/pc/channel.h b/pc/channel.h
index fcb0126..a19950fb 100644
--- a/pc/channel.h
+++ b/pc/channel.h
@@ -127,16 +127,6 @@
                            SdpType type) override;
   RTCError SetRemoteContent(const MediaContentDescription* content,
                             SdpType type) override;
-  // Controls whether this channel will receive packets on the basis of
-  // matching payload type alone. This is needed for legacy endpoints that
-  // don't signal SSRCs or use MID/RID, but doesn't make sense if there is
-  // more than channel of specific media type, As that creates an ambiguity.
-  //
-  // This method will also remove any existing streams that were bound to this
-  // channel on the basis of payload type, since one of these streams might
-  // actually belong to a new channel. See: crbug.com/webrtc/11477
-  bool SetPayloadTypeDemuxingEnabled(bool enabled) override
-      RTC_RUN_ON(network_thread());
 
   void Enable(bool enable) override;
 
@@ -344,7 +334,6 @@
   // call to the worker thread, so it should be safe.
   bool enabled_ RTC_GUARDED_BY(worker_thread()) = false;
   bool enabled_s_ RTC_GUARDED_BY(signaling_thread()) = false;
-  bool payload_type_demuxing_enabled_ RTC_GUARDED_BY(network_thread()) = true;
   std::vector<StreamParams> local_streams_ RTC_GUARDED_BY(worker_thread());
   std::vector<StreamParams> remote_streams_ RTC_GUARDED_BY(worker_thread());
   RtpTransceiverDirection local_content_direction_
diff --git a/pc/channel_interface.h b/pc/channel_interface.h
index 97fed01..dfbe749 100644
--- a/pc/channel_interface.h
+++ b/pc/channel_interface.h
@@ -91,7 +91,6 @@
                                    SdpType type) = 0;
   virtual RTCError SetRemoteContent(const MediaContentDescription* content,
                                     SdpType type) = 0;
-  virtual bool SetPayloadTypeDemuxingEnabled(bool enabled) = 0;
 
   // Access to the local and remote streams that were set on the channel.
   virtual const std::vector<StreamParams>& local_streams() const = 0;
diff --git a/pc/rtp_transceiver.cc b/pc/rtp_transceiver.cc
index 946601a..58da29f 100644
--- a/pc/rtp_transceiver.cc
+++ b/pc/rtp_transceiver.cc
@@ -1372,19 +1372,19 @@
             });
       });
 }
-
-bool RtpTransceiver::SetChannelPayloadTypeDemuxingEnabled(bool enabled) {
-  RTC_DCHECK_RUN_ON(context()->network_thread());
-  RTC_DCHECK(channel_);
-  return channel_->SetPayloadTypeDemuxingEnabled(enabled);
-}
-
 void RtpTransceiver::EnableChannel(bool enable) {
   RTC_DCHECK_RUN_ON(thread_);
   RTC_DCHECK(channel_);
   channel_->Enable(enable);
 }
 
+void RtpTransceiver::ResetUnsignaledRecvStream() {
+  RTC_DCHECK_RUN_ON(thread_);
+  if (MediaReceiveChannelInterface* receive_channel = media_receive_channel()) {
+    receive_channel->ResetUnsignaledRecvStream();
+  }
+}
+
 const std::vector<StreamParams>& RtpTransceiver::channel_local_streams() const {
   RTC_DCHECK_RUN_ON(thread_);
   RTC_DCHECK(channel_);
diff --git a/pc/rtp_transceiver.h b/pc/rtp_transceiver.h
index a69d17b..86e76f8 100644
--- a/pc/rtp_transceiver.h
+++ b/pc/rtp_transceiver.h
@@ -392,8 +392,9 @@
   void SetChannelRemoteContent(const MediaContentDescription* content,
                                SdpType type,
                                ScopedOperationsBatcher& batcher);
-  bool SetChannelPayloadTypeDemuxingEnabled(bool enabled);
   void EnableChannel(bool enable);
+  void ResetUnsignaledRecvStream();
+
   const std::vector<StreamParams>& channel_local_streams() const;
   const std::vector<StreamParams>& channel_remote_streams() const;
   absl::string_view channel_transport_name() const;
diff --git a/pc/rtp_transport.cc b/pc/rtp_transport.cc
index 20db5d3..05ef651 100644
--- a/pc/rtp_transport.cc
+++ b/pc/rtp_transport.cc
@@ -341,9 +341,14 @@
   header_extension_map_ = std::move(merged_map);
 }
 
+void RtpTransport::SetActivePayloadTypeDemuxing(bool enabled) {
+  rtp_demuxer_.set_use_payload_type_demuxing(enabled);
+}
+
 bool RtpTransport::RegisterRtpDemuxerSink(const RtpDemuxerCriteria& criteria,
                                           RtpPacketSinkInterface* sink) {
   rtp_demuxer_.RemoveSink(sink);
+
   if (!rtp_demuxer_.AddSink(criteria, sink)) {
     RTC_LOG(LS_ERROR) << "Failed to register the sink for RTP demuxer.";
     return false;
diff --git a/pc/rtp_transport.h b/pc/rtp_transport.h
index 1b0d65b..e51a124 100644
--- a/pc/rtp_transport.h
+++ b/pc/rtp_transport.h
@@ -112,6 +112,8 @@
 
   bool UnregisterRtpDemuxerSink(RtpPacketSinkInterface* sink) override;
 
+  void SetActivePayloadTypeDemuxing(bool enabled) override;
+
  protected:
   // These methods will be used in the subclasses.
   void DemuxPacket(CopyOnWriteBuffer packet,
diff --git a/pc/rtp_transport_internal.h b/pc/rtp_transport_internal.h
index 96aaa3e..bb4b669 100644
--- a/pc/rtp_transport_internal.h
+++ b/pc/rtp_transport_internal.h
@@ -151,6 +151,8 @@
 
   virtual bool UnregisterRtpDemuxerSink(RtpPacketSinkInterface* sink) = 0;
 
+  virtual void SetActivePayloadTypeDemuxing(bool enabled) = 0;
+
  protected:
   void SendReadyToSend(bool arg) { callback_list_ready_to_send_.Send(arg); }
   void SendRtcpPacketReceived(CopyOnWriteBuffer packet,
diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc
index 9ef845c..7a93480 100644
--- a/pc/sdp_offer_answer.cc
+++ b/pc/sdp_offer_answer.cc
@@ -86,6 +86,7 @@
 #include "pc/rtp_sender_proxy.h"
 #include "pc/rtp_transceiver.h"
 #include "pc/rtp_transmission_manager.h"
+#include "pc/rtp_transport_internal.h"
 #include "pc/scoped_operations_batcher.h"
 #include "pc/sdp_munging_detector.h"
 #include "pc/session_description.h"
@@ -5165,15 +5166,9 @@
   RTC_DCHECK(sdesc);
 
   if (ConfiguredForMedia()) {
-    // Note: This will perform a BlockingCall over to the worker thread, which
+    // Note: This may perform a BlockingCall over to the network thread, which
     // we'll also do in a loop below.
-    if (!UpdatePayloadTypeDemuxingState(source, bundle_groups_by_mid)) {
-      // Note that this is never expected to fail, since RtpDemuxer doesn't
-      // return an error when changing payload type demux criteria, which is all
-      // this does.
-      return LOG_ERROR(RTCError(RTCErrorType::INTERNAL_ERROR)
-                       << "Failed to update payload type demuxing state.");
-    }
+    UpdatePayloadTypeDemuxingState(source, bundle_groups_by_mid);
 
     // Push down the new SDP media section for each audio/video transceiver.
     auto rtp_transceivers = transceivers()->ListInternal();
@@ -5778,7 +5773,7 @@
   return options;
 }
 
-bool SdpOfferAnswerHandler::UpdatePayloadTypeDemuxingState(
+void SdpOfferAnswerHandler::UpdatePayloadTypeDemuxingState(
     ContentSource source,
     const flat_map<std::string, const ContentGroup*>& bundle_groups_by_mid) {
   TRACE_EVENT0("webrtc",
@@ -5879,13 +5874,13 @@
                                         mid_header_extension_missing_video ||
                                         pt_demuxing_has_been_used_video_;
 
-  // Gather all updates ahead of time so that all channels can be updated in a
+  // Gather all updates ahead of time so that all transports can be updated in a
   // single BlockingCall; necessary due to thread guards.
-  std::vector<std::pair<bool, RtpTransceiver*>> channels_to_update;
-  for (const auto& transceiver : transceivers()->ListInternal()) {
+  flat_map<std::string, bool> transports_to_update;
+  for (RtpTransceiver* transceiver : transceivers()->ListInternal()) {
     const ContentInfo* content =
         FindMediaSectionForTransceiver(transceiver, sdesc);
-    if (!transceiver->HasChannel() || !content) {
+    if (!content) {
       continue;
     }
 
@@ -5923,26 +5918,53 @@
       }
     }
 
-    channels_to_update.emplace_back(pt_demux_enabled, transceiver);
+    // Accumulate the pt_demux_enabled state per transport (represented by mid).
+    // If any transceiver associated with the transport requires PT demuxing,
+    // the transport will have it enabled.
+    transports_to_update[content->mid()] |= pt_demux_enabled;
   }
 
-  if (channels_to_update.empty()) {
-    return true;
+  if (transports_to_update.empty()) {
+    return;
   }
 
-  // TODO(bugs.webrtc.org/11993): The demuxer state is now fully managed on
-  // the network thread. So we do a single blocking call here to the network
-  // thread. Ideally we could also do this without blocking.
-  return context_->network_thread()->BlockingCall([&channels_to_update]() {
-    for (const auto& it : channels_to_update) {
-      if (!it.second->SetChannelPayloadTypeDemuxingEnabled(it.first)) {
-        // Note that the state has already been irrevocably changed at this
-        // point. Is it useful to stop the loop?
-        return false;
+  context_->network_thread()->BlockingCall([&]() {
+    JsepTransportController* transport_controller =
+        pc_->transport_controller_n();
+    for (const auto& [mid, pt_demux_enabled] : transports_to_update) {
+      RtpTransportInternal* rtp_transport =
+          transport_controller->GetRtpTransport(mid);
+      if (rtp_transport != nullptr) {
+        rtp_transport->SetActivePayloadTypeDemuxing(pt_demux_enabled);
       }
     }
-    return true;
   });
+
+  // If payload type demuxing is disabled for a transport, clear out any
+  // unsignaled streams that might have been created previously based on
+  // payload type matches. This prevents SSRC collisions if those SSRCs
+  // are later reused.
+  // TODO(tommi): Could/should we rather issue the calls to the transceivers
+  // on the network thread? This will post tasks per transceiver to the worker
+  // thread. A common configuration will have joined worker and network threads,
+  // so bundling with the network thread may be more efficient.
+  for (RtpTransceiver* transceiver : transceivers()->ListInternal()) {
+    const ContentInfo* content =
+        FindMediaSectionForTransceiver(transceiver, sdesc);
+    if (!content) {
+      continue;
+    }
+    auto it = transports_to_update.find(content->mid());
+    if (it != transports_to_update.end() && !it->second) {
+      // TODO: bugs.webrtc.org/42221580 - This will remove *all* unsignaled
+      // streams (those without an explicitly signaled SSRC), which may include
+      // streams that were matched to this channel by MID or RID. Ideally we'd
+      // remove only the streams that were matched based on payload type alone,
+      // but currently there is no straightforward way to identify those
+      // streams.
+      transceiver->ResetUnsignaledRecvStream();
+    }
+  }
 }
 
 bool SdpOfferAnswerHandler::ConfiguredForMedia() const {
diff --git a/pc/sdp_offer_answer.h b/pc/sdp_offer_answer.h
index 7be0f66..802f518 100644
--- a/pc/sdp_offer_answer.h
+++ b/pc/sdp_offer_answer.h
@@ -574,7 +574,7 @@
 
   // Based on number of transceivers per media type, enabled or disable
   // payload type based demuxing in the affected channels.
-  bool UpdatePayloadTypeDemuxingState(
+  void UpdatePayloadTypeDemuxingState(
       ContentSource source,
       const flat_map<std::string, const ContentGroup*>& bundle_groups_by_mid);
 
diff --git a/pc/test/mock_channel_interface.h b/pc/test/mock_channel_interface.h
index b87ab9d..2848905 100644
--- a/pc/test/mock_channel_interface.h
+++ b/pc/test/mock_channel_interface.h
@@ -81,7 +81,6 @@
               SetRemoteContent,
               (const webrtc::MediaContentDescription*, SdpType),
               (override));
-  MOCK_METHOD(bool, SetPayloadTypeDemuxingEnabled, (bool), (override));
   MOCK_METHOD(const std::vector<StreamParams>&,
               local_streams,
               (),