Split FlexfecReceiveStreamImpl init into worker / network steps.

This is comparable to this change for AudioReceiveStream:
https://webrtc-review.googlesource.com/c/src/+/220608/

Bug: webrtc:11993
Change-Id: I6bad7fa693441f80e86d8b021b8cf42727dc9142
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220609
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34170}
diff --git a/call/call.cc b/call/call.cc
index 5835412..76ba46f 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -1164,8 +1164,12 @@
   // OnRtpPacket until the constructor is finished and the object is
   // in a valid state, since OnRtpPacket runs on the same thread.
   receive_stream = new FlexfecReceiveStreamImpl(
-      clock_, &video_receiver_controller_, config, recovered_packet_receiver,
-      call_stats_->AsRtcpRttStats(), module_process_thread_->process_thread());
+      clock_, config, recovered_packet_receiver, call_stats_->AsRtcpRttStats(),
+      module_process_thread_->process_thread());
+
+  // TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network
+  // thread.
+  receive_stream->RegisterWithTransport(&video_receiver_controller_);
 
   RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
              receive_rtp_config_.end());
@@ -1180,6 +1184,11 @@
   TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
   RTC_DCHECK_RUN_ON(worker_thread_);
 
+  FlexfecReceiveStreamImpl* receive_stream_impl =
+      static_cast<FlexfecReceiveStreamImpl*>(receive_stream);
+  // TODO(bugs.webrtc.org/11993): Unregister on the network thread.
+  receive_stream_impl->UnregisterFromTransport();
+
   RTC_DCHECK(receive_stream != nullptr);
   const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
   uint32_t ssrc = config.remote_ssrc;
diff --git a/call/flexfec_receive_stream_impl.cc b/call/flexfec_receive_stream_impl.cc
index e629bca..c335b03 100644
--- a/call/flexfec_receive_stream_impl.cc
+++ b/call/flexfec_receive_stream_impl.cc
@@ -138,7 +138,6 @@
 
 FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl(
     Clock* clock,
-    RtpStreamReceiverControllerInterface* receiver_controller,
     const Config& config,
     RecoveredPacketReceiver* recovered_packet_receiver,
     RtcpRttStats* rtt_stats,
@@ -155,23 +154,11 @@
       process_thread_(process_thread) {
   RTC_LOG(LS_INFO) << "FlexfecReceiveStreamImpl: " << config_.ToString();
 
+  network_thread_checker_.Detach();
+
   // RTCP reporting.
   rtp_rtcp_->SetRTCPStatus(config_.rtcp_mode);
   process_thread_->RegisterModule(rtp_rtcp_.get(), RTC_FROM_HERE);
-
-  // Register with transport.
-  // TODO(nisse): OnRtpPacket in this class delegates all real work to
-  // |receiver_|. So maybe we don't need to implement RtpPacketSinkInterface
-  // here at all, we'd then delete the OnRtpPacket method and instead register
-  // |receiver_| as the RtpPacketSinkInterface for this stream.
-  // TODO(nisse): Passing |this| from the constructor to the RtpDemuxer, before
-  // the object is fully initialized, is risky. But it works in this case
-  // because locking in our caller, Call::CreateFlexfecReceiveStream, ensures
-  // that the demuxer doesn't call OnRtpPacket before this object is fully
-  // constructed. Registering |receiver_| instead of |this| would solve this
-  // problem too.
-  rtp_stream_receiver_ =
-      receiver_controller->CreateReceiver(config_.remote_ssrc, this);
 }
 
 FlexfecReceiveStreamImpl::~FlexfecReceiveStreamImpl() {
@@ -179,6 +166,27 @@
   process_thread_->DeRegisterModule(rtp_rtcp_.get());
 }
 
+void FlexfecReceiveStreamImpl::RegisterWithTransport(
+    RtpStreamReceiverControllerInterface* receiver_controller) {
+  RTC_DCHECK_RUN_ON(&network_thread_checker_);
+  RTC_DCHECK(!rtp_stream_receiver_);
+
+  if (!receiver_)
+    return;
+
+  // TODO(nisse): OnRtpPacket in this class delegates all real work to
+  // `receiver_`. So maybe we don't need to implement RtpPacketSinkInterface
+  // here at all, we'd then delete the OnRtpPacket method and instead register
+  // `receiver_` as the RtpPacketSinkInterface for this stream.
+  rtp_stream_receiver_ =
+      receiver_controller->CreateReceiver(config_.remote_ssrc, this);
+}
+
+void FlexfecReceiveStreamImpl::UnregisterFromTransport() {
+  RTC_DCHECK_RUN_ON(&network_thread_checker_);
+  rtp_stream_receiver_.reset();
+}
+
 void FlexfecReceiveStreamImpl::OnRtpPacket(const RtpPacketReceived& packet) {
   if (!receiver_)
     return;
diff --git a/call/flexfec_receive_stream_impl.h b/call/flexfec_receive_stream_impl.h
index 888dae9..1a4aa03 100644
--- a/call/flexfec_receive_stream_impl.h
+++ b/call/flexfec_receive_stream_impl.h
@@ -16,6 +16,7 @@
 #include "call/flexfec_receive_stream.h"
 #include "call/rtp_packet_sink_interface.h"
 #include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h"
+#include "rtc_base/system/no_unique_address.h"
 #include "system_wrappers/include/clock.h"
 
 namespace webrtc {
@@ -34,13 +35,26 @@
  public:
   FlexfecReceiveStreamImpl(
       Clock* clock,
-      RtpStreamReceiverControllerInterface* receiver_controller,
       const Config& config,
       RecoveredPacketReceiver* recovered_packet_receiver,
       RtcpRttStats* rtt_stats,
       ProcessThread* process_thread);
+  // Destruction happens on the worker thread. Prior to destruction the caller
+  // must ensure that a registration with the transport has been cleared. See
+  // `RegisterWithTransport` for details.
+  // TODO(tommi): As a further improvement to this, performing the full
+  // destruction on the network thread could be made the default.
   ~FlexfecReceiveStreamImpl() override;
 
+  // Called on the network thread to register/unregister with the network
+  // transport.
+  void RegisterWithTransport(
+      RtpStreamReceiverControllerInterface* receiver_controller);
+  // If registration has previously been done (via `RegisterWithTransport`) then
+  // `UnregisterFromTransport` must be called prior to destruction, on the
+  // network thread.
+  void UnregisterFromTransport();
+
   // RtpPacketSinkInterface.
   void OnRtpPacket(const RtpPacketReceived& packet) override;
 
@@ -48,6 +62,8 @@
   const Config& GetConfig() const override;
 
  private:
+  RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_;
+
   // Config.
   const Config config_;
 
@@ -57,9 +73,10 @@
   // RTCP reporting.
   const std::unique_ptr<ReceiveStatistics> rtp_receive_statistics_;
   const std::unique_ptr<ModuleRtpRtcpImpl2> rtp_rtcp_;
-  ProcessThread* process_thread_;
+  ProcessThread* const process_thread_;
 
-  std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_;
+  std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_
+      RTC_GUARDED_BY(network_thread_checker_);
 };
 
 }  // namespace webrtc
diff --git a/call/flexfec_receive_stream_unittest.cc b/call/flexfec_receive_stream_unittest.cc
index 5e8ee47..80408c3 100644
--- a/call/flexfec_receive_stream_unittest.cc
+++ b/call/flexfec_receive_stream_unittest.cc
@@ -89,12 +89,14 @@
       : config_(CreateDefaultConfig(&rtcp_send_transport_)) {
     EXPECT_CALL(process_thread_, RegisterModule(_, _)).Times(1);
     receive_stream_ = std::make_unique<FlexfecReceiveStreamImpl>(
-        Clock::GetRealTimeClock(), &rtp_stream_receiver_controller_, config_,
-        &recovered_packet_receiver_, &rtt_stats_, &process_thread_);
+        Clock::GetRealTimeClock(), config_, &recovered_packet_receiver_,
+        &rtt_stats_, &process_thread_);
+    receive_stream_->RegisterWithTransport(&rtp_stream_receiver_controller_);
   }
 
   ~FlexfecReceiveStreamTest() {
     EXPECT_CALL(process_thread_, DeRegisterModule(_)).Times(1);
+    receive_stream_->UnregisterFromTransport();
   }
 
   MockTransport rtcp_send_transport_;
@@ -145,9 +147,10 @@
 
   ::testing::StrictMock<MockRecoveredPacketReceiver> recovered_packet_receiver;
   EXPECT_CALL(process_thread_, RegisterModule(_, _)).Times(1);
-  FlexfecReceiveStreamImpl receive_stream(
-      Clock::GetRealTimeClock(), &rtp_stream_receiver_controller_, config_,
-      &recovered_packet_receiver, &rtt_stats_, &process_thread_);
+  FlexfecReceiveStreamImpl receive_stream(Clock::GetRealTimeClock(), config_,
+                                          &recovered_packet_receiver,
+                                          &rtt_stats_, &process_thread_);
+  receive_stream.RegisterWithTransport(&rtp_stream_receiver_controller_);
 
   EXPECT_CALL(recovered_packet_receiver,
               OnRecoveredPacket(_, kRtpHeaderSize + kPayloadLength[1]));
@@ -156,6 +159,8 @@
 
   // Tear-down
   EXPECT_CALL(process_thread_, DeRegisterModule(_)).Times(1);
+
+  receive_stream.UnregisterFromTransport();
 }
 
 }  // namespace webrtc