Reland "(Un/)Subscribe RtpVideoSender for feedback on the transport queue."

This is a reland of 9d230d54c7eef31ac1100f0aeef1374dd1ac62fa

Original change's description:
> (Un/)Subscribe RtpVideoSender for feedback on the transport queue.
>
> * RtpVideoSender now registers/unregisters for feedback callback
>   inside of SetActive(), which runs on the transport queue.
> * Transport feedback is given on the transport queue
> * Registration/unregistration for feedback is done on the same
> * Removed the last mutex from TransportFeedbackDemuxer.
>
> Ultimately, this work is related to moving state from the Call
> class, that's related to network configuration, but due to the code
> is currently written is attached to the worker thread, over to the
> Transport, where it's used (e.g. suspended_video_send_ssrcs_).
>
> Bug: webrtc:13517, webrtc:11993
> Change-Id: I057d0e2597e6cb746b335e0308599cd547350e56
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/248165
> Reviewed-by: Erik Språng <sprang@webrtc.org>
> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#35777}

Bug: webrtc:13517, webrtc:11993
Change-Id: I766e569abea8bae96d32267a951fcdc195ced8a7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/249782
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35863}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index fc4f483..230b048 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -572,10 +572,10 @@
 
 void RtpTransportControllerSend::OnTransportFeedback(
     const rtcp::TransportFeedback& feedback) {
-  feedback_demuxer_.OnTransportFeedback(feedback);
   auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
   task_queue_.PostTask([this, feedback, feedback_time]() {
     RTC_DCHECK_RUN_ON(&task_queue_);
+    feedback_demuxer_.OnTransportFeedback(feedback);
     absl::optional<TransportPacketsFeedback> feedback_msg =
         transport_feedback_adapter_.ProcessTransportFeedback(feedback,
                                                              feedback_time);
diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc
index 78cf281..35e6bee 100644
--- a/call/rtp_video_sender.cc
+++ b/call/rtp_video_sender.cc
@@ -445,9 +445,6 @@
   fec_controller_->SetProtectionMethod(fec_enabled, NackEnabled());
 
   fec_controller_->SetProtectionCallback(this);
-  // Signal congestion controller this object is ready for OnPacket* callbacks.
-  transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver(
-      rtp_config_.ssrcs, this);
 
   // Construction happens on the worker thread (see Call::CreateVideoSendStream)
   // but subseqeuent calls to the RTP state will happen on one of two threads:
@@ -466,8 +463,8 @@
 
   SetActiveModulesLocked(
       std::vector<bool>(rtp_streams_.size(), /*active=*/false));
-  transport_->GetStreamFeedbackProvider()->DeRegisterStreamFeedbackObserver(
-      this);
+
+  RTC_DCHECK(!registered_for_feedback_);
 }
 
 void RtpVideoSender::SetActive(bool active) {
@@ -475,8 +472,18 @@
   MutexLock lock(&mutex_);
   if (active_ == active)
     return;
+
   const std::vector<bool> active_modules(rtp_streams_.size(), active);
   SetActiveModulesLocked(active_modules);
+
+  auto* feedback_provider = transport_->GetStreamFeedbackProvider();
+  if (active && !registered_for_feedback_) {
+    feedback_provider->RegisterStreamFeedbackObserver(rtp_config_.ssrcs, this);
+    registered_for_feedback_ = true;
+  } else if (!active && registered_for_feedback_) {
+    feedback_provider->DeRegisterStreamFeedbackObserver(this);
+    registered_for_feedback_ = false;
+  }
 }
 
 void RtpVideoSender::SetActiveModules(const std::vector<bool> active_modules) {
diff --git a/call/rtp_video_sender.h b/call/rtp_video_sender.h
index ea0d1ee..7023804 100644
--- a/call/rtp_video_sender.h
+++ b/call/rtp_video_sender.h
@@ -180,6 +180,7 @@
   // transport task queue.
   mutable Mutex mutex_;
   bool active_ RTC_GUARDED_BY(mutex_);
+  bool registered_for_feedback_ RTC_GUARDED_BY(transport_checker_) = false;
 
   const std::unique_ptr<FecController> fec_controller_;
   bool fec_allowed_ RTC_GUARDED_BY(mutex_);
diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc
index 0644556..c47717d 100644
--- a/call/rtp_video_sender_unittest.cc
+++ b/call/rtp_video_sender_unittest.cc
@@ -13,6 +13,7 @@
 #include <atomic>
 #include <memory>
 #include <string>
+#include <utility>
 
 #include "call/rtp_transport_controller_send.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
@@ -177,10 +178,33 @@
                                   /*frame_count_observer=*/nullptr,
                                   /*frame_transformer=*/nullptr) {}
 
+  ~RtpVideoSenderTestFixture() { SetActive(false); }
+
   RtpVideoSender* router() { return router_.get(); }
   MockTransport& transport() { return transport_; }
   void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); }
 
+  void SetActive(bool active) {
+    RunOnTransportQueue([&]() { router_->SetActive(active); });
+  }
+
+  void SetActiveModules(const std::vector<bool>& active_modules) {
+    RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); });
+  }
+
+  // Several RtpVideoSender methods expect to be called on the task queue as
+  // owned by the send transport. While the SequenceChecker may pick up the
+  // default thread as the transport queue, explicit checks for the transport
+  // queue (not just using a SequenceChecker) aren't possible unless such a
+  // queue is actually active. So RunOnTransportQueue is a convenience function
+  // that allow for running a closure on the transport queue, similar to
+  // SendTask().
+  template <typename Closure>
+  void RunOnTransportQueue(Closure&& task) {
+    transport_controller_.GetWorkerQueue()->PostTask(std::move(task));
+    AdvanceTime(TimeDelta::Millis(0));
+  }
+
  private:
   NiceMock<MockTransport> transport_;
   NiceMock<MockRtcpIntraFrameObserver> encoder_feedback_;
@@ -217,15 +241,15 @@
   EXPECT_NE(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image, nullptr).error);
 
-  test.router()->SetActive(true);
+  test.SetActive(true);
   EXPECT_EQ(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image, nullptr).error);
 
-  test.router()->SetActive(false);
+  test.SetActive(false);
   EXPECT_NE(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image, nullptr).error);
 
-  test.router()->SetActive(true);
+  test.SetActive(true);
   EXPECT_EQ(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image, nullptr).error);
 }
@@ -244,7 +268,7 @@
   CodecSpecificInfo codec_info;
   codec_info.codecType = kVideoCodecVP8;
 
-  test.router()->SetActive(true);
+  test.SetActive(true);
   EXPECT_EQ(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
 
@@ -254,7 +278,7 @@
             test.router()->OnEncodedImage(encoded_image_2, &codec_info).error);
 
   // Inactive.
-  test.router()->SetActive(false);
+  test.SetActive(false);
   EXPECT_NE(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
   EXPECT_NE(EncodedImageCallback::Result::OK,
@@ -284,14 +308,14 @@
   // Only setting one stream to active will still set the payload router to
   // active and allow sending data on the active stream.
   std::vector<bool> active_modules({true, false});
-  test.router()->SetActiveModules(active_modules);
+  test.SetActiveModules(active_modules);
   EXPECT_EQ(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
 
   // Setting both streams to inactive will turn the payload router to
   // inactive.
   active_modules = {false, false};
-  test.router()->SetActiveModules(active_modules);
+  test.SetActiveModules(active_modules);
   // An incoming encoded image will not ask the module to send outgoing data
   // because the payload router is inactive.
   EXPECT_NE(EncodedImageCallback::Result::OK,
@@ -303,7 +327,7 @@
 TEST(RtpVideoSenderTest, CreateWithNoPreviousStates) {
   RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
                                  kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   std::map<uint32_t, RtpPayloadState> initial_states =
       test.router()->GetRtpPayloadStates();
@@ -328,7 +352,7 @@
 
   RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
                                  kPayloadType, states);
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   std::map<uint32_t, RtpPayloadState> initial_states =
       test.router()->GetRtpPayloadStates();
@@ -368,7 +392,7 @@
             test.router()->OnEncodedImage(encoded_image, nullptr).error);
   ::testing::Mock::VerifyAndClearExpectations(&callback);
 
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   FrameCounts frame_counts;
   EXPECT_CALL(callback, FrameCountUpdated(_, kSsrc1))
@@ -397,7 +421,7 @@
 TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) {
   RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
                                  kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   constexpr uint8_t kPayload = 'a';
   EncodedImage encoded_image;
@@ -496,8 +520,8 @@
 }
 
 // This tests that we utilize transport wide feedback to retransmit lost
-// packets. This is tested by dropping all ordirary packets from a "lossy"
-// stream send along with an secondary untouched stream. The transport wide
+// packets. This is tested by dropping all ordinary packets from a "lossy"
+// stream sent along with a secondary untouched stream. The transport wide
 // feedback packets from the secondary stream allows the sending side to
 // detect and retreansmit the lost packets from the lossy stream.
 TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
@@ -562,7 +586,7 @@
 TEST(RtpVideoSenderTest, EarlyRetransmits) {
   RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
                                  kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   const uint8_t kPayload[1] = {'a'};
   EncodedImage encoded_image;
@@ -657,7 +681,7 @@
 
 TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) {
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -717,7 +741,7 @@
 
 TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) {
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -773,7 +797,7 @@
 TEST(RtpVideoSenderTest,
      SupportsDependencyDescriptorForVp9NotProvidedByEncoder) {
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -828,7 +852,7 @@
   test::ScopedFieldTrials field_trials(
       "WebRTC-GenericCodecDependencyDescriptor/Enabled/");
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -874,7 +898,7 @@
 
 TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) {
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -932,7 +956,7 @@
 TEST(RtpVideoSenderTest,
      SupportsStoppingUsingDependencyDescriptorForVp8Simulcast) {
   RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -1017,7 +1041,7 @@
       kRtpHeaderSizeBytes + kTransportPacketOverheadBytes;
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
   test.router()->OnTransportOverheadChanged(kTransportPacketOverheadBytes);
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   {
     test.router()->OnBitrateUpdated(CreateBitrateAllocationUpdate(300000),
diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
index 29accb5..50987b2 100644
--- a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
+++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
@@ -44,11 +44,7 @@
 }
 
 void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
-  // Currently called on the send transport queue.
-  // TODO(tommi): When registration/unregistration as well as
-  // OnTransportFeedback callbacks occur on the transport queue, we can remove
-  // this lock.
-  MutexLock lock(&lock_);
+  RTC_DCHECK_RUN_ON(&observer_checker_);
 
   StreamFeedbackObserver::StreamPacketInfo info;
   info.ssrc = packet_info.media_ssrc;
@@ -66,24 +62,22 @@
 
 void TransportFeedbackDemuxer::OnTransportFeedback(
     const rtcp::TransportFeedback& feedback) {
+  RTC_DCHECK_RUN_ON(&observer_checker_);
+
   std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks;
-  {
-    MutexLock lock(&lock_);
-    for (const auto& packet : feedback.GetAllPackets()) {
-      int64_t seq_num =
-          seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
-      auto it = history_.find(seq_num);
-      if (it != history_.end()) {
-        auto packet_info = it->second;
-        packet_info.received = packet.received();
-        stream_feedbacks.push_back(std::move(packet_info));
-        if (packet.received())
-          history_.erase(it);
-      }
+  for (const auto& packet : feedback.GetAllPackets()) {
+    int64_t seq_num =
+        seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
+    auto it = history_.find(seq_num);
+    if (it != history_.end()) {
+      auto packet_info = it->second;
+      packet_info.received = packet.received();
+      stream_feedbacks.push_back(std::move(packet_info));
+      if (packet.received())
+        history_.erase(it);
     }
   }
 
-  RTC_DCHECK_RUN_ON(&observer_checker_);
   for (auto& observer : observers_) {
     std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
     for (const auto& packet_info : stream_feedbacks) {
diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h
index 895288f..7f4f575 100644
--- a/modules/congestion_controller/rtp/transport_feedback_demuxer.h
+++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.h
@@ -17,7 +17,6 @@
 #include "api/sequence_checker.h"
 #include "modules/include/module_common_types_public.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
-#include "rtc_base/synchronization/mutex.h"
 #include "rtc_base/system/no_unique_address.h"
 
 namespace webrtc {
@@ -45,15 +44,14 @@
   void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
 
  private:
-  Mutex lock_;
-  SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_);
+  RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_;
+  SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&observer_checker_);
   std::map<int64_t, StreamFeedbackObserver::StreamPacketInfo> history_
-      RTC_GUARDED_BY(&lock_);
+      RTC_GUARDED_BY(&observer_checker_);
 
   // Maps a set of ssrcs to corresponding observer. Vectors are used rather than
   // set/map to ensure that the processing order is consistent independently of
   // the randomized ssrcs.
-  RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_;
   std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>>
       observers_ RTC_GUARDED_BY(&observer_checker_);
 };