(Un/)Subscribe RtpVideoSender for feedback on the transport queue.

* RtpVideoSender now registers/unregisters for feedback callback
  inside of SetActive(), which runs on the transport queue.
* Transport feedback is given on the transport queue
* Registration/unregistration for feedback is done on the same
* Removed the last mutex from TransportFeedbackDemuxer.

Ultimately, this work is related to moving state from the Call
class, that's related to network configuration, but due to the code
is currently written is attached to the worker thread, over to the
Transport, where it's used (e.g. suspended_video_send_ssrcs_).

Bug: webrtc:13517, webrtc:11993
Change-Id: I057d0e2597e6cb746b335e0308599cd547350e56
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/248165
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35777}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index fc4f483..230b048 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -572,10 +572,10 @@
 
 void RtpTransportControllerSend::OnTransportFeedback(
     const rtcp::TransportFeedback& feedback) {
-  feedback_demuxer_.OnTransportFeedback(feedback);
   auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());
   task_queue_.PostTask([this, feedback, feedback_time]() {
     RTC_DCHECK_RUN_ON(&task_queue_);
+    feedback_demuxer_.OnTransportFeedback(feedback);
     absl::optional<TransportPacketsFeedback> feedback_msg =
         transport_feedback_adapter_.ProcessTransportFeedback(feedback,
                                                              feedback_time);
diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc
index 78cf281..35e6bee 100644
--- a/call/rtp_video_sender.cc
+++ b/call/rtp_video_sender.cc
@@ -445,9 +445,6 @@
   fec_controller_->SetProtectionMethod(fec_enabled, NackEnabled());
 
   fec_controller_->SetProtectionCallback(this);
-  // Signal congestion controller this object is ready for OnPacket* callbacks.
-  transport_->GetStreamFeedbackProvider()->RegisterStreamFeedbackObserver(
-      rtp_config_.ssrcs, this);
 
   // Construction happens on the worker thread (see Call::CreateVideoSendStream)
   // but subseqeuent calls to the RTP state will happen on one of two threads:
@@ -466,8 +463,8 @@
 
   SetActiveModulesLocked(
       std::vector<bool>(rtp_streams_.size(), /*active=*/false));
-  transport_->GetStreamFeedbackProvider()->DeRegisterStreamFeedbackObserver(
-      this);
+
+  RTC_DCHECK(!registered_for_feedback_);
 }
 
 void RtpVideoSender::SetActive(bool active) {
@@ -475,8 +472,18 @@
   MutexLock lock(&mutex_);
   if (active_ == active)
     return;
+
   const std::vector<bool> active_modules(rtp_streams_.size(), active);
   SetActiveModulesLocked(active_modules);
+
+  auto* feedback_provider = transport_->GetStreamFeedbackProvider();
+  if (active && !registered_for_feedback_) {
+    feedback_provider->RegisterStreamFeedbackObserver(rtp_config_.ssrcs, this);
+    registered_for_feedback_ = true;
+  } else if (!active && registered_for_feedback_) {
+    feedback_provider->DeRegisterStreamFeedbackObserver(this);
+    registered_for_feedback_ = false;
+  }
 }
 
 void RtpVideoSender::SetActiveModules(const std::vector<bool> active_modules) {
diff --git a/call/rtp_video_sender.h b/call/rtp_video_sender.h
index 9832246..378f902 100644
--- a/call/rtp_video_sender.h
+++ b/call/rtp_video_sender.h
@@ -180,6 +180,7 @@
   // transport task queue.
   mutable Mutex mutex_;
   bool active_ RTC_GUARDED_BY(mutex_);
+  bool registered_for_feedback_ RTC_GUARDED_BY(transport_checker_) = false;
 
   const std::unique_ptr<FecController> fec_controller_;
   bool fec_allowed_ RTC_GUARDED_BY(mutex_);
diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc
index 0644556..c47717d 100644
--- a/call/rtp_video_sender_unittest.cc
+++ b/call/rtp_video_sender_unittest.cc
@@ -13,6 +13,7 @@
 #include <atomic>
 #include <memory>
 #include <string>
+#include <utility>
 
 #include "call/rtp_transport_controller_send.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
@@ -177,10 +178,33 @@
                                   /*frame_count_observer=*/nullptr,
                                   /*frame_transformer=*/nullptr) {}
 
+  ~RtpVideoSenderTestFixture() { SetActive(false); }
+
   RtpVideoSender* router() { return router_.get(); }
   MockTransport& transport() { return transport_; }
   void AdvanceTime(TimeDelta delta) { time_controller_.AdvanceTime(delta); }
 
+  void SetActive(bool active) {
+    RunOnTransportQueue([&]() { router_->SetActive(active); });
+  }
+
+  void SetActiveModules(const std::vector<bool>& active_modules) {
+    RunOnTransportQueue([&]() { router_->SetActiveModules(active_modules); });
+  }
+
+  // Several RtpVideoSender methods expect to be called on the task queue as
+  // owned by the send transport. While the SequenceChecker may pick up the
+  // default thread as the transport queue, explicit checks for the transport
+  // queue (not just using a SequenceChecker) aren't possible unless such a
+  // queue is actually active. So RunOnTransportQueue is a convenience function
+  // that allow for running a closure on the transport queue, similar to
+  // SendTask().
+  template <typename Closure>
+  void RunOnTransportQueue(Closure&& task) {
+    transport_controller_.GetWorkerQueue()->PostTask(std::move(task));
+    AdvanceTime(TimeDelta::Millis(0));
+  }
+
  private:
   NiceMock<MockTransport> transport_;
   NiceMock<MockRtcpIntraFrameObserver> encoder_feedback_;
@@ -217,15 +241,15 @@
   EXPECT_NE(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image, nullptr).error);
 
-  test.router()->SetActive(true);
+  test.SetActive(true);
   EXPECT_EQ(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image, nullptr).error);
 
-  test.router()->SetActive(false);
+  test.SetActive(false);
   EXPECT_NE(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image, nullptr).error);
 
-  test.router()->SetActive(true);
+  test.SetActive(true);
   EXPECT_EQ(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image, nullptr).error);
 }
@@ -244,7 +268,7 @@
   CodecSpecificInfo codec_info;
   codec_info.codecType = kVideoCodecVP8;
 
-  test.router()->SetActive(true);
+  test.SetActive(true);
   EXPECT_EQ(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
 
@@ -254,7 +278,7 @@
             test.router()->OnEncodedImage(encoded_image_2, &codec_info).error);
 
   // Inactive.
-  test.router()->SetActive(false);
+  test.SetActive(false);
   EXPECT_NE(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
   EXPECT_NE(EncodedImageCallback::Result::OK,
@@ -284,14 +308,14 @@
   // Only setting one stream to active will still set the payload router to
   // active and allow sending data on the active stream.
   std::vector<bool> active_modules({true, false});
-  test.router()->SetActiveModules(active_modules);
+  test.SetActiveModules(active_modules);
   EXPECT_EQ(EncodedImageCallback::Result::OK,
             test.router()->OnEncodedImage(encoded_image_1, &codec_info).error);
 
   // Setting both streams to inactive will turn the payload router to
   // inactive.
   active_modules = {false, false};
-  test.router()->SetActiveModules(active_modules);
+  test.SetActiveModules(active_modules);
   // An incoming encoded image will not ask the module to send outgoing data
   // because the payload router is inactive.
   EXPECT_NE(EncodedImageCallback::Result::OK,
@@ -303,7 +327,7 @@
 TEST(RtpVideoSenderTest, CreateWithNoPreviousStates) {
   RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
                                  kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   std::map<uint32_t, RtpPayloadState> initial_states =
       test.router()->GetRtpPayloadStates();
@@ -328,7 +352,7 @@
 
   RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
                                  kPayloadType, states);
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   std::map<uint32_t, RtpPayloadState> initial_states =
       test.router()->GetRtpPayloadStates();
@@ -368,7 +392,7 @@
             test.router()->OnEncodedImage(encoded_image, nullptr).error);
   ::testing::Mock::VerifyAndClearExpectations(&callback);
 
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   FrameCounts frame_counts;
   EXPECT_CALL(callback, FrameCountUpdated(_, kSsrc1))
@@ -397,7 +421,7 @@
 TEST(RtpVideoSenderTest, DoesNotRetrasmitAckedPackets) {
   RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
                                  kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   constexpr uint8_t kPayload = 'a';
   EncodedImage encoded_image;
@@ -496,8 +520,8 @@
 }
 
 // This tests that we utilize transport wide feedback to retransmit lost
-// packets. This is tested by dropping all ordirary packets from a "lossy"
-// stream send along with an secondary untouched stream. The transport wide
+// packets. This is tested by dropping all ordinary packets from a "lossy"
+// stream sent along with a secondary untouched stream. The transport wide
 // feedback packets from the secondary stream allows the sending side to
 // detect and retreansmit the lost packets from the lossy stream.
 TEST(RtpVideoSenderTest, RetransmitsOnTransportWideLossInfo) {
@@ -562,7 +586,7 @@
 TEST(RtpVideoSenderTest, EarlyRetransmits) {
   RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {kRtxSsrc1, kRtxSsrc2},
                                  kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   const uint8_t kPayload[1] = {'a'};
   EncodedImage encoded_image;
@@ -657,7 +681,7 @@
 
 TEST(RtpVideoSenderTest, SupportsDependencyDescriptor) {
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -717,7 +741,7 @@
 
 TEST(RtpVideoSenderTest, SupportsDependencyDescriptorForVp9) {
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -773,7 +797,7 @@
 TEST(RtpVideoSenderTest,
      SupportsDependencyDescriptorForVp9NotProvidedByEncoder) {
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -828,7 +852,7 @@
   test::ScopedFieldTrials field_trials(
       "WebRTC-GenericCodecDependencyDescriptor/Enabled/");
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -874,7 +898,7 @@
 
 TEST(RtpVideoSenderTest, SupportsStoppingUsingDependencyDescriptor) {
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -932,7 +956,7 @@
 TEST(RtpVideoSenderTest,
      SupportsStoppingUsingDependencyDescriptorForVp8Simulcast) {
   RtpVideoSenderTestFixture test({kSsrc1, kSsrc2}, {}, kPayloadType, {});
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   RtpHeaderExtensionMap extensions;
   extensions.Register<RtpDependencyDescriptorExtension>(
@@ -1017,7 +1041,7 @@
       kRtpHeaderSizeBytes + kTransportPacketOverheadBytes;
   RtpVideoSenderTestFixture test({kSsrc1}, {}, kPayloadType, {});
   test.router()->OnTransportOverheadChanged(kTransportPacketOverheadBytes);
-  test.router()->SetActive(true);
+  test.SetActive(true);
 
   {
     test.router()->OnBitrateUpdated(CreateBitrateAllocationUpdate(300000),
diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
index 29accb5..50987b2 100644
--- a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
+++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
@@ -44,11 +44,7 @@
 }
 
 void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
-  // Currently called on the send transport queue.
-  // TODO(tommi): When registration/unregistration as well as
-  // OnTransportFeedback callbacks occur on the transport queue, we can remove
-  // this lock.
-  MutexLock lock(&lock_);
+  RTC_DCHECK_RUN_ON(&observer_checker_);
 
   StreamFeedbackObserver::StreamPacketInfo info;
   info.ssrc = packet_info.media_ssrc;
@@ -66,24 +62,22 @@
 
 void TransportFeedbackDemuxer::OnTransportFeedback(
     const rtcp::TransportFeedback& feedback) {
+  RTC_DCHECK_RUN_ON(&observer_checker_);
+
   std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks;
-  {
-    MutexLock lock(&lock_);
-    for (const auto& packet : feedback.GetAllPackets()) {
-      int64_t seq_num =
-          seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
-      auto it = history_.find(seq_num);
-      if (it != history_.end()) {
-        auto packet_info = it->second;
-        packet_info.received = packet.received();
-        stream_feedbacks.push_back(std::move(packet_info));
-        if (packet.received())
-          history_.erase(it);
-      }
+  for (const auto& packet : feedback.GetAllPackets()) {
+    int64_t seq_num =
+        seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
+    auto it = history_.find(seq_num);
+    if (it != history_.end()) {
+      auto packet_info = it->second;
+      packet_info.received = packet.received();
+      stream_feedbacks.push_back(std::move(packet_info));
+      if (packet.received())
+        history_.erase(it);
     }
   }
 
-  RTC_DCHECK_RUN_ON(&observer_checker_);
   for (auto& observer : observers_) {
     std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
     for (const auto& packet_info : stream_feedbacks) {
diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h
index 895288f..7f4f575 100644
--- a/modules/congestion_controller/rtp/transport_feedback_demuxer.h
+++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.h
@@ -17,7 +17,6 @@
 #include "api/sequence_checker.h"
 #include "modules/include/module_common_types_public.h"
 #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
-#include "rtc_base/synchronization/mutex.h"
 #include "rtc_base/system/no_unique_address.h"
 
 namespace webrtc {
@@ -45,15 +44,14 @@
   void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
 
  private:
-  Mutex lock_;
-  SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_);
+  RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_;
+  SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&observer_checker_);
   std::map<int64_t, StreamFeedbackObserver::StreamPacketInfo> history_
-      RTC_GUARDED_BY(&lock_);
+      RTC_GUARDED_BY(&observer_checker_);
 
   // Maps a set of ssrcs to corresponding observer. Vectors are used rather than
   // set/map to ensure that the processing order is consistent independently of
   // the randomized ssrcs.
-  RTC_NO_UNIQUE_ADDRESS SequenceChecker observer_checker_;
   std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>>
       observers_ RTC_GUARDED_BY(&observer_checker_);
 };