Introduce RtpStreamReceiverInterface and RtpStreamReceiverControllerInterface.
And implementation class RtpStreamReceiverController.
It's responsible for demuxing, and acts as factory for
RtpStreamReceiverInterface.
BUG=webrtc:7135
Review-Url: https://codereview.webrtc.org/2886993005
Cr-Commit-Position: refs/heads/master@{#18696}
diff --git a/webrtc/call/BUILD.gn b/webrtc/call/BUILD.gn
index 9807c64..aa98053 100644
--- a/webrtc/call/BUILD.gn
+++ b/webrtc/call/BUILD.gn
@@ -38,6 +38,7 @@
rtc_source_set("rtp_interfaces") {
sources = [
"rtp_packet_sink_interface.h",
+ "rtp_stream_receiver_controller_interface.h",
"rtp_transport_controller_send_interface.h",
]
}
@@ -46,6 +47,8 @@
sources = [
"rtp_demuxer.cc",
"rtp_demuxer.h",
+ "rtp_stream_receiver_controller.cc",
+ "rtp_stream_receiver_controller.h",
"rtx_receive_stream.cc",
"rtx_receive_stream.h",
]
diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc
index c0861dd..b4a9456 100644
--- a/webrtc/call/call.cc
+++ b/webrtc/call/call.cc
@@ -34,7 +34,7 @@
#include "webrtc/call/bitrate_allocator.h"
#include "webrtc/call/call.h"
#include "webrtc/call/flexfec_receive_stream_impl.h"
-#include "webrtc/call/rtp_demuxer.h"
+#include "webrtc/call/rtp_stream_receiver_controller.h"
#include "webrtc/call/rtp_transport_controller_send.h"
#include "webrtc/config.h"
#include "webrtc/logging/rtc_event_log/rtc_event_log.h"
@@ -275,10 +275,10 @@
std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
GUARDED_BY(receive_crit_);
- // TODO(nisse): Should eventually be part of injected
- // RtpTransportControllerReceive, with a single demuxer in the bundled case.
- RtpDemuxer audio_rtp_demuxer_ GUARDED_BY(receive_crit_);
- RtpDemuxer video_rtp_demuxer_ GUARDED_BY(receive_crit_);
+ // TODO(nisse): Should eventually be injected at creation,
+ // with a single object in the bundled case.
+ RtpStreamReceiverController audio_receiver_controller;
+ RtpStreamReceiverController video_receiver_controller;
// This extra map is used for receive processing which is
// independent of media type.
@@ -486,10 +486,6 @@
if (!parsed_packet.Parse(packet, length))
return rtc::Optional<RtpPacketReceived>();
- auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
- if (it != receive_rtp_config_.end())
- parsed_packet.IdentifyExtensions(it->second.extensions);
-
int64_t arrival_time_ms;
if (packet_time && packet_time->timestamp != -1) {
arrival_time_ms = (packet_time->timestamp + 500) / 1000;
@@ -646,12 +642,11 @@
TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream");
RTC_DCHECK_RUN_ON(&configuration_thread_checker_);
event_log_->LogAudioReceiveStreamConfig(CreateRtcLogStreamConfig(config));
- AudioReceiveStream* receive_stream =
- new AudioReceiveStream(transport_send_->packet_router(), config,
- config_.audio_state, event_log_);
+ AudioReceiveStream* receive_stream = new AudioReceiveStream(
+ &audio_receiver_controller, transport_send_->packet_router(), config,
+ config_.audio_state, event_log_);
{
WriteLockScoped write_lock(*receive_crit_);
- audio_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream);
receive_rtp_config_[config.rtp.remote_ssrc] =
ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config));
audio_receive_streams_.insert(receive_stream);
@@ -683,8 +678,6 @@
uint32_t ssrc = config.rtp.remote_ssrc;
receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
->RemoveStream(ssrc);
- size_t num_deleted = audio_rtp_demuxer_.RemoveSink(audio_receive_stream);
- RTC_DCHECK(num_deleted == 1);
audio_receive_streams_.erase(audio_receive_stream);
const std::string& sync_group = audio_receive_stream->config().sync_group;
const auto it = sync_stream_mapping_.find(sync_group);
@@ -776,19 +769,17 @@
TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream");
RTC_DCHECK_RUN_ON(&configuration_thread_checker_);
- VideoReceiveStream* receive_stream =
- new VideoReceiveStream(num_cpu_cores_, transport_send_->packet_router(),
- std::move(configuration),
- module_process_thread_.get(), call_stats_.get());
+ VideoReceiveStream* receive_stream = new VideoReceiveStream(
+ &video_receiver_controller, num_cpu_cores_,
+ transport_send_->packet_router(), std::move(configuration),
+ module_process_thread_.get(), call_stats_.get());
const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
ReceiveRtpConfig receive_config(config.rtp.extensions,
UseSendSideBwe(config));
{
WriteLockScoped write_lock(*receive_crit_);
- video_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream);
if (config.rtp.rtx_ssrc) {
- video_rtp_demuxer_.AddSink(config.rtp.rtx_ssrc, receive_stream);
// We record identical config for the rtx stream as for the main
// stream. Since the transport_send_cc negotiation is per payload
// type, we may get an incorrect value for the rtx stream, but
@@ -817,8 +808,6 @@
WriteLockScoped write_lock(*receive_crit_);
// Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
// separate SSRC there can be either one or two.
- size_t num_deleted = video_rtp_demuxer_.RemoveSink(receive_stream_impl);
- RTC_DCHECK_GE(num_deleted, 1);
receive_rtp_config_.erase(config.rtp.remote_ssrc);
if (config.rtp.rtx_ssrc) {
receive_rtp_config_.erase(config.rtp.rtx_ssrc);
@@ -840,16 +829,22 @@
RTC_DCHECK_RUN_ON(&configuration_thread_checker_);
RecoveredPacketReceiver* recovered_packet_receiver = this;
- FlexfecReceiveStreamImpl* receive_stream = new FlexfecReceiveStreamImpl(
- config, recovered_packet_receiver, call_stats_->rtcp_rtt_stats(),
- module_process_thread_.get());
+ FlexfecReceiveStreamImpl* receive_stream;
{
WriteLockScoped write_lock(*receive_crit_);
- video_rtp_demuxer_.AddSink(config.remote_ssrc, receive_stream);
-
- for (auto ssrc : config.protected_media_ssrcs)
- video_rtp_demuxer_.AddSink(ssrc, receive_stream);
+ // Unlike the video and audio receive streams,
+ // FlexfecReceiveStream implements RtpPacketSinkInterface itself,
+ // and hence its constructor passes its |this| pointer to
+ // video_receiver_controller->CreateStream(). Calling the
+ // constructor while holding |receive_crit_| ensures that we don't
+ // call OnRtpPacket until the constructor is finished and the
+ // object is in a valid state.
+ // TODO(nisse): Fix constructor so that it can be moved outside of
+ // this locked scope.
+ receive_stream = new FlexfecReceiveStreamImpl(
+ &video_receiver_controller, config, recovered_packet_receiver,
+ call_stats_->rtcp_rtt_stats(), module_process_thread_.get());
RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
receive_rtp_config_.end());
@@ -881,7 +876,6 @@
// Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
// destroyed.
- video_rtp_demuxer_.RemoveSink(receive_stream_impl);
receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
->RemoveStream(ssrc);
}
@@ -1302,17 +1296,31 @@
if (!parsed_packet)
return DELIVERY_PACKET_ERROR;
+ auto it = receive_rtp_config_.find(parsed_packet->Ssrc());
+ if (it == receive_rtp_config_.end()) {
+ LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
+ << parsed_packet->Ssrc();
+ // Destruction of the receive stream, including deregistering from the
+ // RtpDemuxer, is not protected by the |receive_crit_| lock. But
+ // deregistering in the |receive_rtp_config_| map is protected by that lock.
+ // So by not passing the packet on to demuxing in this case, we prevent
+ // incoming packets to be passed on via the demuxer to a receive stream
+ // which is being torned down.
+ return DELIVERY_UNKNOWN_SSRC;
+ }
+ parsed_packet->IdentifyExtensions(it->second.extensions);
+
NotifyBweOfReceivedPacket(*parsed_packet, media_type);
if (media_type == MediaType::AUDIO) {
- if (audio_rtp_demuxer_.OnRtpPacket(*parsed_packet)) {
+ if (audio_receiver_controller.OnRtpPacket(*parsed_packet)) {
received_bytes_per_second_counter_.Add(static_cast<int>(length));
received_audio_bytes_per_second_counter_.Add(static_cast<int>(length));
event_log_->LogRtpHeader(kIncomingPacket, packet, length);
return DELIVERY_OK;
}
} else if (media_type == MediaType::VIDEO) {
- if (video_rtp_demuxer_.OnRtpPacket(*parsed_packet)) {
+ if (video_receiver_controller.OnRtpPacket(*parsed_packet)) {
received_bytes_per_second_counter_.Add(static_cast<int>(length));
received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
event_log_->LogRtpHeader(kIncomingPacket, packet, length);
@@ -1348,7 +1356,7 @@
parsed_packet->set_recovered(true);
- video_rtp_demuxer_.OnRtpPacket(*parsed_packet);
+ video_receiver_controller.OnRtpPacket(*parsed_packet);
}
void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
diff --git a/webrtc/call/flexfec_receive_stream_impl.cc b/webrtc/call/flexfec_receive_stream_impl.cc
index f010433..c73d9e9 100644
--- a/webrtc/call/flexfec_receive_stream_impl.cc
+++ b/webrtc/call/flexfec_receive_stream_impl.cc
@@ -15,6 +15,7 @@
#include "webrtc/base/checks.h"
#include "webrtc/base/location.h"
#include "webrtc/base/logging.h"
+#include "webrtc/call/rtp_stream_receiver_controller_interface.h"
#include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
#include "webrtc/modules/rtp_rtcp/include/receive_statistics.h"
#include "webrtc/modules/rtp_rtcp/include/rtp_rtcp.h"
@@ -122,6 +123,7 @@
} // namespace
FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl(
+ RtpStreamReceiverControllerInterface* receiver_controller,
const Config& config,
RecoveredPacketReceiver* recovered_packet_receiver,
RtcpRttStats* rtt_stats,
@@ -141,6 +143,22 @@
rtp_rtcp_->SetRTCPStatus(config_.rtcp_mode);
rtp_rtcp_->SetSSRC(config_.local_ssrc);
process_thread_->RegisterModule(rtp_rtcp_.get(), RTC_FROM_HERE);
+
+ // Register with transport.
+ // TODO(nisse): OnRtpPacket in this class delegates all real work to
+ // |receiver_|. So maybe we don't need to implement RtpPacketSinkInterface
+ // here at all, we'd then delete the OnRtpPacket method and instead register
+ // |receiver_| as the RtpPacketSinkInterface for this stream.
+ // TODO(nisse): Passing |this| from the constructor to the RtpDemuxer, before
+ // the object is fully initialized, is risky. But it works in this case
+ // because locking in our caller, Call::CreateFlexfecReceiveStream, ensures
+ // that the demuxer doesn't call OnRtpPacket before this object is fully
+ // constructed. Registering |receiver_| instead of |this| would solve this
+ // problem too.
+ rtp_stream_receiver_ =
+ receiver_controller->CreateReceiver(config_.remote_ssrc, this);
+ for (uint32_t ssrc : config.protected_media_ssrcs)
+ receiver_controller->AddSink(ssrc, this);
}
FlexfecReceiveStreamImpl::~FlexfecReceiveStreamImpl() {
diff --git a/webrtc/call/flexfec_receive_stream_impl.h b/webrtc/call/flexfec_receive_stream_impl.h
index e4c2294..a89940f 100644
--- a/webrtc/call/flexfec_receive_stream_impl.h
+++ b/webrtc/call/flexfec_receive_stream_impl.h
@@ -26,14 +26,18 @@
class RtcpRttStats;
class RtpPacketReceived;
class RtpRtcp;
+class RtpStreamReceiverControllerInterface;
+class RtpStreamReceiverInterface;
class FlexfecReceiveStreamImpl : public FlexfecReceiveStream,
public RtpPacketSinkInterface {
public:
- FlexfecReceiveStreamImpl(const Config& config,
- RecoveredPacketReceiver* recovered_packet_receiver,
- RtcpRttStats* rtt_stats,
- ProcessThread* process_thread);
+ FlexfecReceiveStreamImpl(
+ RtpStreamReceiverControllerInterface* receiver_controller,
+ const Config& config,
+ RecoveredPacketReceiver* recovered_packet_receiver,
+ RtcpRttStats* rtt_stats,
+ ProcessThread* process_thread);
~FlexfecReceiveStreamImpl() override;
const Config& GetConfig() const { return config_; }
@@ -59,6 +63,8 @@
const std::unique_ptr<ReceiveStatistics> rtp_receive_statistics_;
const std::unique_ptr<RtpRtcp> rtp_rtcp_;
ProcessThread* process_thread_;
+
+ std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_;
};
} // namespace webrtc
diff --git a/webrtc/call/flexfec_receive_stream_unittest.cc b/webrtc/call/flexfec_receive_stream_unittest.cc
index 46bd7c3..ba41406 100644
--- a/webrtc/call/flexfec_receive_stream_unittest.cc
+++ b/webrtc/call/flexfec_receive_stream_unittest.cc
@@ -12,6 +12,7 @@
#include "webrtc/base/array_view.h"
#include "webrtc/call/flexfec_receive_stream_impl.h"
+#include "webrtc/call/rtp_stream_receiver_controller.h"
#include "webrtc/modules/pacing/packet_router.h"
#include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
#include "webrtc/modules/rtp_rtcp/mocks/mock_recovered_packet_receiver.h"
@@ -77,7 +78,8 @@
protected:
FlexfecReceiveStreamTest()
: config_(CreateDefaultConfig(&rtcp_send_transport_)),
- receive_stream_(config_,
+ receive_stream_(&rtp_stream_receiver_controller_,
+ config_,
&recovered_packet_receiver_,
&rtt_stats_,
&process_thread_) {}
@@ -87,7 +89,7 @@
MockRecoveredPacketReceiver recovered_packet_receiver_;
MockRtcpRttStats rtt_stats_;
MockProcessThread process_thread_;
-
+ RtpStreamReceiverController rtp_stream_receiver_controller_;
FlexfecReceiveStreamImpl receive_stream_;
};
@@ -134,7 +136,8 @@
// clang-format on
testing::StrictMock<MockRecoveredPacketReceiver> recovered_packet_receiver;
- FlexfecReceiveStreamImpl receive_stream(config_, &recovered_packet_receiver,
+ FlexfecReceiveStreamImpl receive_stream(&rtp_stream_receiver_controller_,
+ config_, &recovered_packet_receiver,
&rtt_stats_, &process_thread_);
// Do not call back before being started.
diff --git a/webrtc/call/rtp_stream_receiver_controller.cc b/webrtc/call/rtp_stream_receiver_controller.cc
new file mode 100644
index 0000000..a4b1e36
--- /dev/null
+++ b/webrtc/call/rtp_stream_receiver_controller.cc
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/call/rtp_stream_receiver_controller.h"
+#include "webrtc/base/ptr_util.h"
+
+namespace webrtc {
+
+RtpStreamReceiverController::Receiver::Receiver(
+ RtpStreamReceiverController* controller,
+ uint32_t ssrc,
+ RtpPacketSinkInterface* sink)
+ : controller_(controller), sink_(sink) {
+ controller_->AddSink(ssrc, sink_);
+}
+
+RtpStreamReceiverController::Receiver::~Receiver() {
+ // Don't require return value > 0, since for RTX we currently may
+ // have multiple Receiver objects with the same sink.
+ // TODO(nisse): Consider adding a DCHECK when RtxReceiveStream is wired up.
+ controller_->RemoveSink(sink_);
+}
+
+RtpStreamReceiverController::RtpStreamReceiverController() = default;
+RtpStreamReceiverController::~RtpStreamReceiverController() = default;
+
+std::unique_ptr<RtpStreamReceiverInterface>
+RtpStreamReceiverController::CreateReceiver(
+ uint32_t ssrc,
+ RtpPacketSinkInterface* sink) {
+ return rtc::MakeUnique<Receiver>(this, ssrc, sink);
+}
+
+bool RtpStreamReceiverController::OnRtpPacket(const RtpPacketReceived& packet) {
+ rtc::CritScope cs(&lock_);
+ return demuxer_.OnRtpPacket(packet);
+}
+
+void RtpStreamReceiverController::AddSink(uint32_t ssrc,
+ RtpPacketSinkInterface* sink) {
+ rtc::CritScope cs(&lock_);
+ return demuxer_.AddSink(ssrc, sink);
+}
+
+size_t RtpStreamReceiverController::RemoveSink(
+ const RtpPacketSinkInterface* sink) {
+ rtc::CritScope cs(&lock_);
+ return demuxer_.RemoveSink(sink);
+}
+
+} // namespace webrtc
diff --git a/webrtc/call/rtp_stream_receiver_controller.h b/webrtc/call/rtp_stream_receiver_controller.h
new file mode 100644
index 0000000..5c8ed67
--- /dev/null
+++ b/webrtc/call/rtp_stream_receiver_controller.h
@@ -0,0 +1,72 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_H_
+#define WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_H_
+
+#include <memory>
+
+#include "webrtc/base/criticalsection.h"
+#include "webrtc/call/rtp_demuxer.h"
+#include "webrtc/call/rtp_stream_receiver_controller_interface.h"
+
+namespace webrtc {
+
+class RtpPacketReceived;
+
+// This class represents the RTP receive parsing and demuxing, for a
+// single RTP session.
+// TODO(nisse): Add RTCP processing, we should aim to terminate RTCP
+// and not leave any RTCP processing to individual receive streams.
+// TODO(nisse): Extract per-packet processing, including parsing and
+// demuxing, into a separate class.
+class RtpStreamReceiverController
+ : public RtpStreamReceiverControllerInterface {
+ public:
+ RtpStreamReceiverController();
+ ~RtpStreamReceiverController() override;
+
+ // Implements RtpStreamReceiverControllerInterface.
+ std::unique_ptr<RtpStreamReceiverInterface> CreateReceiver(
+ uint32_t ssrc,
+ RtpPacketSinkInterface* sink) override;
+
+ // Thread-safe wrappers for the corresponding RtpDemuxer methods.
+ void AddSink(uint32_t ssrc, RtpPacketSinkInterface* sink) override;
+ size_t RemoveSink(const RtpPacketSinkInterface* sink) override;
+
+ // TODO(nisse): Not yet responsible for parsing.
+ bool OnRtpPacket(const RtpPacketReceived& packet);
+
+ private:
+ class Receiver : public RtpStreamReceiverInterface {
+ public:
+ Receiver(RtpStreamReceiverController* controller,
+ uint32_t ssrc,
+ RtpPacketSinkInterface* sink);
+
+ ~Receiver() override;
+
+ private:
+ RtpStreamReceiverController* const controller_;
+ RtpPacketSinkInterface* const sink_;
+ };
+
+ // TODO(nisse): Move to a TaskQueue for synchronization. When used
+ // by Call, we expect construction and all methods but OnRtpPacket
+ // to be called on the same thread, and OnRtpPacket to be called
+ // by a single, but possibly distinct, thread. But applications not
+ // using Call may have use threads differently.
+ rtc::CriticalSection lock_;
+ RtpDemuxer demuxer_ GUARDED_BY(&lock_);
+};
+
+} // namespace webrtc
+
+#endif // WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_H_
diff --git a/webrtc/call/rtp_stream_receiver_controller_interface.h b/webrtc/call/rtp_stream_receiver_controller_interface.h
new file mode 100644
index 0000000..51d25a5
--- /dev/null
+++ b/webrtc/call/rtp_stream_receiver_controller_interface.h
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ * Use of this source code is governed by a BSD-style license
+ * that can be found in the LICENSE file in the root of the source
+ * tree. An additional intellectual property rights grant can be found
+ * in the file PATENTS. All contributing project authors may
+ * be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_INTERFACE_H_
+#define WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_INTERFACE_H_
+
+#include <memory>
+
+#include "webrtc/call/rtp_packet_sink_interface.h"
+
+namespace webrtc {
+
+// An RtpStreamReceiver is responsible for the rtp-specific but
+// media-independent state needed for receiving an RTP stream.
+// TODO(nisse): Currently, only owns the association between ssrc and
+// the stream's RtpPacketSinkInterface. Ownership of corresponding
+// objects from modules/rtp_rtcp/ should move to this class (or
+// rather, the corresponding implementation class). We should add
+// methods for getting rtp receive stats, and for sending RTCP
+// messages related to the receive stream.
+class RtpStreamReceiverInterface {
+ public:
+ virtual ~RtpStreamReceiverInterface() {}
+};
+
+// This class acts as a factory for RtpStreamReceiver objects.
+class RtpStreamReceiverControllerInterface {
+ public:
+ virtual ~RtpStreamReceiverControllerInterface() {}
+
+ virtual std::unique_ptr<RtpStreamReceiverInterface> CreateReceiver(
+ uint32_t ssrc,
+ RtpPacketSinkInterface* sink) = 0;
+ // For registering additional sinks, needed for FlexFEC.
+ virtual void AddSink(uint32_t ssrc, RtpPacketSinkInterface* sink) = 0;
+ virtual size_t RemoveSink(const RtpPacketSinkInterface* sink) = 0;
+};
+
+} // namespace webrtc
+
+#endif // WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_INTERFACE_H_