Split FlexfecReceiveStreamImpl init into worker / network steps.
This is comparable to this change for AudioReceiveStream:
https://webrtc-review.googlesource.com/c/src/+/220608/
Bug: webrtc:11993
Change-Id: I6bad7fa693441f80e86d8b021b8cf42727dc9142
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220609
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34170}
diff --git a/call/call.cc b/call/call.cc
index 5835412..76ba46f 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -1164,8 +1164,12 @@
// OnRtpPacket until the constructor is finished and the object is
// in a valid state, since OnRtpPacket runs on the same thread.
receive_stream = new FlexfecReceiveStreamImpl(
- clock_, &video_receiver_controller_, config, recovered_packet_receiver,
- call_stats_->AsRtcpRttStats(), module_process_thread_->process_thread());
+ clock_, config, recovered_packet_receiver, call_stats_->AsRtcpRttStats(),
+ module_process_thread_->process_thread());
+
+ // TODO(bugs.webrtc.org/11993): Set this up asynchronously on the network
+ // thread.
+ receive_stream->RegisterWithTransport(&video_receiver_controller_);
RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
receive_rtp_config_.end());
@@ -1180,6 +1184,11 @@
TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
RTC_DCHECK_RUN_ON(worker_thread_);
+ FlexfecReceiveStreamImpl* receive_stream_impl =
+ static_cast<FlexfecReceiveStreamImpl*>(receive_stream);
+ // TODO(bugs.webrtc.org/11993): Unregister on the network thread.
+ receive_stream_impl->UnregisterFromTransport();
+
RTC_DCHECK(receive_stream != nullptr);
const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
uint32_t ssrc = config.remote_ssrc;
diff --git a/call/flexfec_receive_stream_impl.cc b/call/flexfec_receive_stream_impl.cc
index e629bca..c335b03 100644
--- a/call/flexfec_receive_stream_impl.cc
+++ b/call/flexfec_receive_stream_impl.cc
@@ -138,7 +138,6 @@
FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl(
Clock* clock,
- RtpStreamReceiverControllerInterface* receiver_controller,
const Config& config,
RecoveredPacketReceiver* recovered_packet_receiver,
RtcpRttStats* rtt_stats,
@@ -155,23 +154,11 @@
process_thread_(process_thread) {
RTC_LOG(LS_INFO) << "FlexfecReceiveStreamImpl: " << config_.ToString();
+ network_thread_checker_.Detach();
+
// RTCP reporting.
rtp_rtcp_->SetRTCPStatus(config_.rtcp_mode);
process_thread_->RegisterModule(rtp_rtcp_.get(), RTC_FROM_HERE);
-
- // Register with transport.
- // TODO(nisse): OnRtpPacket in this class delegates all real work to
- // |receiver_|. So maybe we don't need to implement RtpPacketSinkInterface
- // here at all, we'd then delete the OnRtpPacket method and instead register
- // |receiver_| as the RtpPacketSinkInterface for this stream.
- // TODO(nisse): Passing |this| from the constructor to the RtpDemuxer, before
- // the object is fully initialized, is risky. But it works in this case
- // because locking in our caller, Call::CreateFlexfecReceiveStream, ensures
- // that the demuxer doesn't call OnRtpPacket before this object is fully
- // constructed. Registering |receiver_| instead of |this| would solve this
- // problem too.
- rtp_stream_receiver_ =
- receiver_controller->CreateReceiver(config_.remote_ssrc, this);
}
FlexfecReceiveStreamImpl::~FlexfecReceiveStreamImpl() {
@@ -179,6 +166,27 @@
process_thread_->DeRegisterModule(rtp_rtcp_.get());
}
+void FlexfecReceiveStreamImpl::RegisterWithTransport(
+ RtpStreamReceiverControllerInterface* receiver_controller) {
+ RTC_DCHECK_RUN_ON(&network_thread_checker_);
+ RTC_DCHECK(!rtp_stream_receiver_);
+
+ if (!receiver_)
+ return;
+
+ // TODO(nisse): OnRtpPacket in this class delegates all real work to
+ // `receiver_`. So maybe we don't need to implement RtpPacketSinkInterface
+ // here at all, we'd then delete the OnRtpPacket method and instead register
+ // `receiver_` as the RtpPacketSinkInterface for this stream.
+ rtp_stream_receiver_ =
+ receiver_controller->CreateReceiver(config_.remote_ssrc, this);
+}
+
+void FlexfecReceiveStreamImpl::UnregisterFromTransport() {
+ RTC_DCHECK_RUN_ON(&network_thread_checker_);
+ rtp_stream_receiver_.reset();
+}
+
void FlexfecReceiveStreamImpl::OnRtpPacket(const RtpPacketReceived& packet) {
if (!receiver_)
return;
diff --git a/call/flexfec_receive_stream_impl.h b/call/flexfec_receive_stream_impl.h
index 888dae9..1a4aa03 100644
--- a/call/flexfec_receive_stream_impl.h
+++ b/call/flexfec_receive_stream_impl.h
@@ -16,6 +16,7 @@
#include "call/flexfec_receive_stream.h"
#include "call/rtp_packet_sink_interface.h"
#include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h"
+#include "rtc_base/system/no_unique_address.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
@@ -34,13 +35,26 @@
public:
FlexfecReceiveStreamImpl(
Clock* clock,
- RtpStreamReceiverControllerInterface* receiver_controller,
const Config& config,
RecoveredPacketReceiver* recovered_packet_receiver,
RtcpRttStats* rtt_stats,
ProcessThread* process_thread);
+ // Destruction happens on the worker thread. Prior to destruction the caller
+ // must ensure that a registration with the transport has been cleared. See
+ // `RegisterWithTransport` for details.
+ // TODO(tommi): As a further improvement to this, performing the full
+ // destruction on the network thread could be made the default.
~FlexfecReceiveStreamImpl() override;
+ // Called on the network thread to register/unregister with the network
+ // transport.
+ void RegisterWithTransport(
+ RtpStreamReceiverControllerInterface* receiver_controller);
+ // If registration has previously been done (via `RegisterWithTransport`) then
+ // `UnregisterFromTransport` must be called prior to destruction, on the
+ // network thread.
+ void UnregisterFromTransport();
+
// RtpPacketSinkInterface.
void OnRtpPacket(const RtpPacketReceived& packet) override;
@@ -48,6 +62,8 @@
const Config& GetConfig() const override;
private:
+ RTC_NO_UNIQUE_ADDRESS SequenceChecker network_thread_checker_;
+
// Config.
const Config config_;
@@ -57,9 +73,10 @@
// RTCP reporting.
const std::unique_ptr<ReceiveStatistics> rtp_receive_statistics_;
const std::unique_ptr<ModuleRtpRtcpImpl2> rtp_rtcp_;
- ProcessThread* process_thread_;
+ ProcessThread* const process_thread_;
- std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_;
+ std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_
+ RTC_GUARDED_BY(network_thread_checker_);
};
} // namespace webrtc
diff --git a/call/flexfec_receive_stream_unittest.cc b/call/flexfec_receive_stream_unittest.cc
index 5e8ee47..80408c3 100644
--- a/call/flexfec_receive_stream_unittest.cc
+++ b/call/flexfec_receive_stream_unittest.cc
@@ -89,12 +89,14 @@
: config_(CreateDefaultConfig(&rtcp_send_transport_)) {
EXPECT_CALL(process_thread_, RegisterModule(_, _)).Times(1);
receive_stream_ = std::make_unique<FlexfecReceiveStreamImpl>(
- Clock::GetRealTimeClock(), &rtp_stream_receiver_controller_, config_,
- &recovered_packet_receiver_, &rtt_stats_, &process_thread_);
+ Clock::GetRealTimeClock(), config_, &recovered_packet_receiver_,
+ &rtt_stats_, &process_thread_);
+ receive_stream_->RegisterWithTransport(&rtp_stream_receiver_controller_);
}
~FlexfecReceiveStreamTest() {
EXPECT_CALL(process_thread_, DeRegisterModule(_)).Times(1);
+ receive_stream_->UnregisterFromTransport();
}
MockTransport rtcp_send_transport_;
@@ -145,9 +147,10 @@
::testing::StrictMock<MockRecoveredPacketReceiver> recovered_packet_receiver;
EXPECT_CALL(process_thread_, RegisterModule(_, _)).Times(1);
- FlexfecReceiveStreamImpl receive_stream(
- Clock::GetRealTimeClock(), &rtp_stream_receiver_controller_, config_,
- &recovered_packet_receiver, &rtt_stats_, &process_thread_);
+ FlexfecReceiveStreamImpl receive_stream(Clock::GetRealTimeClock(), config_,
+ &recovered_packet_receiver,
+ &rtt_stats_, &process_thread_);
+ receive_stream.RegisterWithTransport(&rtp_stream_receiver_controller_);
EXPECT_CALL(recovered_packet_receiver,
OnRecoveredPacket(_, kRtpHeaderSize + kPayloadLength[1]));
@@ -156,6 +159,8 @@
// Tear-down
EXPECT_CALL(process_thread_, DeRegisterModule(_)).Times(1);
+
+ receive_stream.UnregisterFromTransport();
}
} // namespace webrtc