Introduce RtpStreamReceiverInterface and RtpStreamReceiverControllerInterface.

And implementation class RtpStreamReceiverController.
It's responsible for demuxing, and acts as factory for
RtpStreamReceiverInterface.

BUG=webrtc:7135

Review-Url: https://codereview.webrtc.org/2886993005
Cr-Commit-Position: refs/heads/master@{#18696}
diff --git a/webrtc/audio/BUILD.gn b/webrtc/audio/BUILD.gn
index 48a6706..414a7c6 100644
--- a/webrtc/audio/BUILD.gn
+++ b/webrtc/audio/BUILD.gn
@@ -79,6 +79,7 @@
       "../api:mock_audio_mixer",
       "../base:rtc_base_approved",
       "../base:rtc_task_queue",
+      "../call:rtp_receiver",
       "../modules/audio_device:mock_audio_device",
       "../modules/audio_mixer:audio_mixer_impl",
       "../modules/congestion_controller:congestion_controller",
diff --git a/webrtc/audio/audio_receive_stream.cc b/webrtc/audio/audio_receive_stream.cc
index cb90a68..079e971 100644
--- a/webrtc/audio/audio_receive_stream.cc
+++ b/webrtc/audio/audio_receive_stream.cc
@@ -20,6 +20,7 @@
 #include "webrtc/base/checks.h"
 #include "webrtc/base/logging.h"
 #include "webrtc/base/timeutils.h"
+#include "webrtc/call/rtp_stream_receiver_controller_interface.h"
 #include "webrtc/modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
 #include "webrtc/modules/rtp_rtcp/include/rtp_receiver.h"
 #include "webrtc/modules/rtp_rtcp/include/rtp_rtcp.h"
@@ -62,12 +63,12 @@
 
 namespace internal {
 AudioReceiveStream::AudioReceiveStream(
+    RtpStreamReceiverControllerInterface* receiver_controller,
     PacketRouter* packet_router,
     const webrtc::AudioReceiveStream::Config& config,
     const rtc::scoped_refptr<webrtc::AudioState>& audio_state,
     webrtc::RtcEventLog* event_log)
-    : config_(config),
-      audio_state_(audio_state) {
+    : config_(config), audio_state_(audio_state) {
   LOG(LS_INFO) << "AudioReceiveStream: " << config_.ToString();
   RTC_DCHECK_NE(config_.voe_channel_id, -1);
   RTC_DCHECK(audio_state_.get());
@@ -107,6 +108,11 @@
   }
   // Configure bandwidth estimation.
   channel_proxy_->RegisterReceiverCongestionControlObjects(packet_router);
+
+  // Register with transport.
+  rtp_stream_receiver_ =
+      receiver_controller->CreateReceiver(config_.rtp.remote_ssrc,
+                                          channel_proxy_.get());
 }
 
 AudioReceiveStream::~AudioReceiveStream() {
diff --git a/webrtc/audio/audio_receive_stream.h b/webrtc/audio/audio_receive_stream.h
index 7dcc6d3..92c4763 100644
--- a/webrtc/audio/audio_receive_stream.h
+++ b/webrtc/audio/audio_receive_stream.h
@@ -26,6 +26,8 @@
 class PacketRouter;
 class RtcEventLog;
 class RtpPacketReceived;
+class RtpStreamReceiverControllerInterface;
+class RtpStreamReceiverInterface;
 
 namespace voe {
 class ChannelProxy;
@@ -36,10 +38,10 @@
 
 class AudioReceiveStream final : public webrtc::AudioReceiveStream,
                                  public AudioMixer::Source,
-                                 public Syncable,
-                                 public RtpPacketSinkInterface {
+                                 public Syncable {
  public:
-  AudioReceiveStream(PacketRouter* packet_router,
+  AudioReceiveStream(RtpStreamReceiverControllerInterface* receiver_controller,
+                     PacketRouter* packet_router,
                      const webrtc::AudioReceiveStream::Config& config,
                      const rtc::scoped_refptr<webrtc::AudioState>& audio_state,
                      webrtc::RtcEventLog* event_log);
@@ -54,8 +56,11 @@
   void SetGain(float gain) override;
   std::vector<webrtc::RtpSource> GetSources() const override;
 
-  // RtpPacketSinkInterface.
-  void OnRtpPacket(const RtpPacketReceived& packet) override;
+  // TODO(nisse): We don't formally implement RtpPacketSinkInterface, and this
+  // method shouldn't be needed. But it's currently used by the
+  // AudioReceiveStreamTest.ReceiveRtpPacket unittest. Figure out if that test
+  // shuld be refactored or deleted, and then delete this method.
+  void OnRtpPacket(const RtpPacketReceived& packet);
 
   // AudioMixer::Source
   AudioFrameInfo GetAudioFrameWithInfo(int sample_rate_hz,
@@ -87,6 +92,8 @@
 
   bool playing_ ACCESS_ON(worker_thread_checker_) = false;
 
+  std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_;
+
   RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(AudioReceiveStream);
 };
 }  // namespace internal
diff --git a/webrtc/audio/audio_receive_stream_unittest.cc b/webrtc/audio/audio_receive_stream_unittest.cc
index f600b85..84efb20 100644
--- a/webrtc/audio/audio_receive_stream_unittest.cc
+++ b/webrtc/audio/audio_receive_stream_unittest.cc
@@ -15,6 +15,7 @@
 #include "webrtc/api/test/mock_audio_mixer.h"
 #include "webrtc/audio/audio_receive_stream.h"
 #include "webrtc/audio/conversion.h"
+#include "webrtc/call/rtp_stream_receiver_controller.h"
 #include "webrtc/logging/rtc_event_log/mock/mock_rtc_event_log.h"
 #include "webrtc/modules/bitrate_controller/include/mock/mock_bitrate_controller.h"
 #include "webrtc/modules/pacing/packet_router.h"
@@ -137,6 +138,9 @@
   rtc::scoped_refptr<MockAudioMixer> audio_mixer() { return audio_mixer_; }
   MockVoiceEngine& voice_engine() { return voice_engine_; }
   MockVoEChannelProxy* channel_proxy() { return channel_proxy_; }
+  RtpStreamReceiverControllerInterface* rtp_stream_receiver_controller() {
+    return &rtp_stream_receiver_controller_;
+  }
 
   void SetupMockForGetStats() {
     using testing::DoAll;
@@ -166,6 +170,7 @@
   rtc::scoped_refptr<MockAudioMixer> audio_mixer_;
   AudioReceiveStream::Config stream_config_;
   testing::StrictMock<MockVoEChannelProxy>* channel_proxy_ = nullptr;
+  RtpStreamReceiverController rtp_stream_receiver_controller_;
 };
 
 void BuildOneByteExtension(std::vector<uint8_t>::iterator it,
@@ -238,6 +243,7 @@
 TEST(AudioReceiveStreamTest, ConstructDestruct) {
   ConfigHelper helper;
   internal::AudioReceiveStream recv_stream(
+      helper.rtp_stream_receiver_controller(),
       helper.packet_router(),
       helper.config(), helper.audio_state(), helper.event_log());
 }
@@ -246,6 +252,7 @@
   ConfigHelper helper;
   helper.config().rtp.transport_cc = true;
   internal::AudioReceiveStream recv_stream(
+      helper.rtp_stream_receiver_controller(),
       helper.packet_router(),
       helper.config(), helper.audio_state(), helper.event_log());
   const int kTransportSequenceNumberValue = 1234;
@@ -267,6 +274,7 @@
   ConfigHelper helper;
   helper.config().rtp.transport_cc = true;
   internal::AudioReceiveStream recv_stream(
+      helper.rtp_stream_receiver_controller(),
       helper.packet_router(),
       helper.config(), helper.audio_state(), helper.event_log());
 
@@ -280,6 +288,7 @@
 TEST(AudioReceiveStreamTest, GetStats) {
   ConfigHelper helper;
   internal::AudioReceiveStream recv_stream(
+      helper.rtp_stream_receiver_controller(),
       helper.packet_router(),
       helper.config(), helper.audio_state(), helper.event_log());
   helper.SetupMockForGetStats();
@@ -325,6 +334,7 @@
 TEST(AudioReceiveStreamTest, SetGain) {
   ConfigHelper helper;
   internal::AudioReceiveStream recv_stream(
+      helper.rtp_stream_receiver_controller(),
       helper.packet_router(),
       helper.config(), helper.audio_state(), helper.event_log());
   EXPECT_CALL(*helper.channel_proxy(),
@@ -335,6 +345,7 @@
 TEST(AudioReceiveStreamTest, StreamShouldNotBeAddedToMixerWhenVoEReturnsError) {
   ConfigHelper helper;
   internal::AudioReceiveStream recv_stream(
+      helper.rtp_stream_receiver_controller(),
       helper.packet_router(),
       helper.config(), helper.audio_state(), helper.event_log());
 
@@ -347,6 +358,7 @@
 TEST(AudioReceiveStreamTest, StreamShouldBeAddedToMixerOnStart) {
   ConfigHelper helper;
   internal::AudioReceiveStream recv_stream(
+      helper.rtp_stream_receiver_controller(),
       helper.packet_router(),
       helper.config(), helper.audio_state(), helper.event_log());
 
diff --git a/webrtc/call/BUILD.gn b/webrtc/call/BUILD.gn
index 9807c64..aa98053 100644
--- a/webrtc/call/BUILD.gn
+++ b/webrtc/call/BUILD.gn
@@ -38,6 +38,7 @@
 rtc_source_set("rtp_interfaces") {
   sources = [
     "rtp_packet_sink_interface.h",
+    "rtp_stream_receiver_controller_interface.h",
     "rtp_transport_controller_send_interface.h",
   ]
 }
@@ -46,6 +47,8 @@
   sources = [
     "rtp_demuxer.cc",
     "rtp_demuxer.h",
+    "rtp_stream_receiver_controller.cc",
+    "rtp_stream_receiver_controller.h",
     "rtx_receive_stream.cc",
     "rtx_receive_stream.h",
   ]
diff --git a/webrtc/call/call.cc b/webrtc/call/call.cc
index c0861dd..b4a9456 100644
--- a/webrtc/call/call.cc
+++ b/webrtc/call/call.cc
@@ -34,7 +34,7 @@
 #include "webrtc/call/bitrate_allocator.h"
 #include "webrtc/call/call.h"
 #include "webrtc/call/flexfec_receive_stream_impl.h"
-#include "webrtc/call/rtp_demuxer.h"
+#include "webrtc/call/rtp_stream_receiver_controller.h"
 #include "webrtc/call/rtp_transport_controller_send.h"
 #include "webrtc/config.h"
 #include "webrtc/logging/rtc_event_log/rtc_event_log.h"
@@ -275,10 +275,10 @@
   std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
       GUARDED_BY(receive_crit_);
 
-  // TODO(nisse): Should eventually be part of injected
-  // RtpTransportControllerReceive, with a single demuxer in the bundled case.
-  RtpDemuxer audio_rtp_demuxer_ GUARDED_BY(receive_crit_);
-  RtpDemuxer video_rtp_demuxer_ GUARDED_BY(receive_crit_);
+  // TODO(nisse): Should eventually be injected at creation,
+  // with a single object in the bundled case.
+  RtpStreamReceiverController audio_receiver_controller;
+  RtpStreamReceiverController video_receiver_controller;
 
   // This extra map is used for receive processing which is
   // independent of media type.
@@ -486,10 +486,6 @@
   if (!parsed_packet.Parse(packet, length))
     return rtc::Optional<RtpPacketReceived>();
 
-  auto it = receive_rtp_config_.find(parsed_packet.Ssrc());
-  if (it != receive_rtp_config_.end())
-    parsed_packet.IdentifyExtensions(it->second.extensions);
-
   int64_t arrival_time_ms;
   if (packet_time && packet_time->timestamp != -1) {
     arrival_time_ms = (packet_time->timestamp + 500) / 1000;
@@ -646,12 +642,11 @@
   TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream");
   RTC_DCHECK_RUN_ON(&configuration_thread_checker_);
   event_log_->LogAudioReceiveStreamConfig(CreateRtcLogStreamConfig(config));
-  AudioReceiveStream* receive_stream =
-      new AudioReceiveStream(transport_send_->packet_router(), config,
-                             config_.audio_state, event_log_);
+  AudioReceiveStream* receive_stream = new AudioReceiveStream(
+      &audio_receiver_controller, transport_send_->packet_router(), config,
+      config_.audio_state, event_log_);
   {
     WriteLockScoped write_lock(*receive_crit_);
-    audio_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream);
     receive_rtp_config_[config.rtp.remote_ssrc] =
         ReceiveRtpConfig(config.rtp.extensions, UseSendSideBwe(config));
     audio_receive_streams_.insert(receive_stream);
@@ -683,8 +678,6 @@
     uint32_t ssrc = config.rtp.remote_ssrc;
     receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
         ->RemoveStream(ssrc);
-    size_t num_deleted = audio_rtp_demuxer_.RemoveSink(audio_receive_stream);
-    RTC_DCHECK(num_deleted == 1);
     audio_receive_streams_.erase(audio_receive_stream);
     const std::string& sync_group = audio_receive_stream->config().sync_group;
     const auto it = sync_stream_mapping_.find(sync_group);
@@ -776,19 +769,17 @@
   TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream");
   RTC_DCHECK_RUN_ON(&configuration_thread_checker_);
 
-  VideoReceiveStream* receive_stream =
-      new VideoReceiveStream(num_cpu_cores_, transport_send_->packet_router(),
-                             std::move(configuration),
-                             module_process_thread_.get(), call_stats_.get());
+  VideoReceiveStream* receive_stream = new VideoReceiveStream(
+      &video_receiver_controller, num_cpu_cores_,
+      transport_send_->packet_router(), std::move(configuration),
+      module_process_thread_.get(), call_stats_.get());
 
   const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
   ReceiveRtpConfig receive_config(config.rtp.extensions,
                                   UseSendSideBwe(config));
   {
     WriteLockScoped write_lock(*receive_crit_);
-    video_rtp_demuxer_.AddSink(config.rtp.remote_ssrc, receive_stream);
     if (config.rtp.rtx_ssrc) {
-      video_rtp_demuxer_.AddSink(config.rtp.rtx_ssrc, receive_stream);
       // We record identical config for the rtx stream as for the main
       // stream. Since the transport_send_cc negotiation is per payload
       // type, we may get an incorrect value for the rtx stream, but
@@ -817,8 +808,6 @@
     WriteLockScoped write_lock(*receive_crit_);
     // Remove all ssrcs pointing to a receive stream. As RTX retransmits on a
     // separate SSRC there can be either one or two.
-    size_t num_deleted = video_rtp_demuxer_.RemoveSink(receive_stream_impl);
-    RTC_DCHECK_GE(num_deleted, 1);
     receive_rtp_config_.erase(config.rtp.remote_ssrc);
     if (config.rtp.rtx_ssrc) {
       receive_rtp_config_.erase(config.rtp.rtx_ssrc);
@@ -840,16 +829,22 @@
   RTC_DCHECK_RUN_ON(&configuration_thread_checker_);
 
   RecoveredPacketReceiver* recovered_packet_receiver = this;
-  FlexfecReceiveStreamImpl* receive_stream = new FlexfecReceiveStreamImpl(
-      config, recovered_packet_receiver, call_stats_->rtcp_rtt_stats(),
-      module_process_thread_.get());
 
+  FlexfecReceiveStreamImpl* receive_stream;
   {
     WriteLockScoped write_lock(*receive_crit_);
-    video_rtp_demuxer_.AddSink(config.remote_ssrc, receive_stream);
-
-    for (auto ssrc : config.protected_media_ssrcs)
-      video_rtp_demuxer_.AddSink(ssrc, receive_stream);
+    // Unlike the video and audio receive streams,
+    // FlexfecReceiveStream implements RtpPacketSinkInterface itself,
+    // and hence its constructor passes its |this| pointer to
+    // video_receiver_controller->CreateStream(). Calling the
+    // constructor while holding |receive_crit_| ensures that we don't
+    // call OnRtpPacket until the constructor is finished and the
+    // object is in a valid state.
+    // TODO(nisse): Fix constructor so that it can be moved outside of
+    // this locked scope.
+    receive_stream = new FlexfecReceiveStreamImpl(
+        &video_receiver_controller, config, recovered_packet_receiver,
+        call_stats_->rtcp_rtt_stats(), module_process_thread_.get());
 
     RTC_DCHECK(receive_rtp_config_.find(config.remote_ssrc) ==
                receive_rtp_config_.end());
@@ -881,7 +876,6 @@
 
     // Remove all SSRCs pointing to the FlexfecReceiveStreamImpl to be
     // destroyed.
-    video_rtp_demuxer_.RemoveSink(receive_stream_impl);
     receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
         ->RemoveStream(ssrc);
   }
@@ -1302,17 +1296,31 @@
   if (!parsed_packet)
     return DELIVERY_PACKET_ERROR;
 
+  auto it = receive_rtp_config_.find(parsed_packet->Ssrc());
+  if (it == receive_rtp_config_.end()) {
+    LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
+                  << parsed_packet->Ssrc();
+    // Destruction of the receive stream, including deregistering from the
+    // RtpDemuxer, is not protected by the |receive_crit_| lock. But
+    // deregistering in the |receive_rtp_config_| map is protected by that lock.
+    // So by not passing the packet on to demuxing in this case, we prevent
+    // incoming packets to be passed on via the demuxer to a receive stream
+    // which is being torned down.
+    return DELIVERY_UNKNOWN_SSRC;
+  }
+  parsed_packet->IdentifyExtensions(it->second.extensions);
+
   NotifyBweOfReceivedPacket(*parsed_packet, media_type);
 
   if (media_type == MediaType::AUDIO) {
-    if (audio_rtp_demuxer_.OnRtpPacket(*parsed_packet)) {
+    if (audio_receiver_controller.OnRtpPacket(*parsed_packet)) {
       received_bytes_per_second_counter_.Add(static_cast<int>(length));
       received_audio_bytes_per_second_counter_.Add(static_cast<int>(length));
       event_log_->LogRtpHeader(kIncomingPacket, packet, length);
       return DELIVERY_OK;
     }
   } else if (media_type == MediaType::VIDEO) {
-    if (video_rtp_demuxer_.OnRtpPacket(*parsed_packet)) {
+    if (video_receiver_controller.OnRtpPacket(*parsed_packet)) {
       received_bytes_per_second_counter_.Add(static_cast<int>(length));
       received_video_bytes_per_second_counter_.Add(static_cast<int>(length));
       event_log_->LogRtpHeader(kIncomingPacket, packet, length);
@@ -1348,7 +1356,7 @@
 
   parsed_packet->set_recovered(true);
 
-  video_rtp_demuxer_.OnRtpPacket(*parsed_packet);
+  video_receiver_controller.OnRtpPacket(*parsed_packet);
 }
 
 void Call::NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
diff --git a/webrtc/call/flexfec_receive_stream_impl.cc b/webrtc/call/flexfec_receive_stream_impl.cc
index f010433..c73d9e9 100644
--- a/webrtc/call/flexfec_receive_stream_impl.cc
+++ b/webrtc/call/flexfec_receive_stream_impl.cc
@@ -15,6 +15,7 @@
 #include "webrtc/base/checks.h"
 #include "webrtc/base/location.h"
 #include "webrtc/base/logging.h"
+#include "webrtc/call/rtp_stream_receiver_controller_interface.h"
 #include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
 #include "webrtc/modules/rtp_rtcp/include/receive_statistics.h"
 #include "webrtc/modules/rtp_rtcp/include/rtp_rtcp.h"
@@ -122,6 +123,7 @@
 }  // namespace
 
 FlexfecReceiveStreamImpl::FlexfecReceiveStreamImpl(
+    RtpStreamReceiverControllerInterface* receiver_controller,
     const Config& config,
     RecoveredPacketReceiver* recovered_packet_receiver,
     RtcpRttStats* rtt_stats,
@@ -141,6 +143,22 @@
   rtp_rtcp_->SetRTCPStatus(config_.rtcp_mode);
   rtp_rtcp_->SetSSRC(config_.local_ssrc);
   process_thread_->RegisterModule(rtp_rtcp_.get(), RTC_FROM_HERE);
+
+  // Register with transport.
+  // TODO(nisse): OnRtpPacket in this class delegates all real work to
+  // |receiver_|. So maybe we don't need to implement RtpPacketSinkInterface
+  // here at all, we'd then delete the OnRtpPacket method and instead register
+  // |receiver_| as the RtpPacketSinkInterface for this stream.
+  // TODO(nisse): Passing |this| from the constructor to the RtpDemuxer, before
+  // the object is fully initialized, is risky. But it works in this case
+  // because locking in our caller, Call::CreateFlexfecReceiveStream, ensures
+  // that the demuxer doesn't call OnRtpPacket before this object is fully
+  // constructed. Registering |receiver_| instead of |this| would solve this
+  // problem too.
+  rtp_stream_receiver_ =
+      receiver_controller->CreateReceiver(config_.remote_ssrc, this);
+  for (uint32_t ssrc : config.protected_media_ssrcs)
+    receiver_controller->AddSink(ssrc, this);
 }
 
 FlexfecReceiveStreamImpl::~FlexfecReceiveStreamImpl() {
diff --git a/webrtc/call/flexfec_receive_stream_impl.h b/webrtc/call/flexfec_receive_stream_impl.h
index e4c2294..a89940f 100644
--- a/webrtc/call/flexfec_receive_stream_impl.h
+++ b/webrtc/call/flexfec_receive_stream_impl.h
@@ -26,14 +26,18 @@
 class RtcpRttStats;
 class RtpPacketReceived;
 class RtpRtcp;
+class RtpStreamReceiverControllerInterface;
+class RtpStreamReceiverInterface;
 
 class FlexfecReceiveStreamImpl : public FlexfecReceiveStream,
                                  public RtpPacketSinkInterface {
  public:
-  FlexfecReceiveStreamImpl(const Config& config,
-                           RecoveredPacketReceiver* recovered_packet_receiver,
-                           RtcpRttStats* rtt_stats,
-                           ProcessThread* process_thread);
+  FlexfecReceiveStreamImpl(
+      RtpStreamReceiverControllerInterface* receiver_controller,
+      const Config& config,
+      RecoveredPacketReceiver* recovered_packet_receiver,
+      RtcpRttStats* rtt_stats,
+      ProcessThread* process_thread);
   ~FlexfecReceiveStreamImpl() override;
 
   const Config& GetConfig() const { return config_; }
@@ -59,6 +63,8 @@
   const std::unique_ptr<ReceiveStatistics> rtp_receive_statistics_;
   const std::unique_ptr<RtpRtcp> rtp_rtcp_;
   ProcessThread* process_thread_;
+
+  std::unique_ptr<RtpStreamReceiverInterface> rtp_stream_receiver_;
 };
 
 }  // namespace webrtc
diff --git a/webrtc/call/flexfec_receive_stream_unittest.cc b/webrtc/call/flexfec_receive_stream_unittest.cc
index 46bd7c3..ba41406 100644
--- a/webrtc/call/flexfec_receive_stream_unittest.cc
+++ b/webrtc/call/flexfec_receive_stream_unittest.cc
@@ -12,6 +12,7 @@
 
 #include "webrtc/base/array_view.h"
 #include "webrtc/call/flexfec_receive_stream_impl.h"
+#include "webrtc/call/rtp_stream_receiver_controller.h"
 #include "webrtc/modules/pacing/packet_router.h"
 #include "webrtc/modules/rtp_rtcp/include/flexfec_receiver.h"
 #include "webrtc/modules/rtp_rtcp/mocks/mock_recovered_packet_receiver.h"
@@ -77,7 +78,8 @@
  protected:
   FlexfecReceiveStreamTest()
       : config_(CreateDefaultConfig(&rtcp_send_transport_)),
-        receive_stream_(config_,
+        receive_stream_(&rtp_stream_receiver_controller_,
+                        config_,
                         &recovered_packet_receiver_,
                         &rtt_stats_,
                         &process_thread_) {}
@@ -87,7 +89,7 @@
   MockRecoveredPacketReceiver recovered_packet_receiver_;
   MockRtcpRttStats rtt_stats_;
   MockProcessThread process_thread_;
-
+  RtpStreamReceiverController rtp_stream_receiver_controller_;
   FlexfecReceiveStreamImpl receive_stream_;
 };
 
@@ -134,7 +136,8 @@
   // clang-format on
 
   testing::StrictMock<MockRecoveredPacketReceiver> recovered_packet_receiver;
-  FlexfecReceiveStreamImpl receive_stream(config_, &recovered_packet_receiver,
+  FlexfecReceiveStreamImpl receive_stream(&rtp_stream_receiver_controller_,
+                                          config_, &recovered_packet_receiver,
                                           &rtt_stats_, &process_thread_);
 
   // Do not call back before being started.
diff --git a/webrtc/call/rtp_stream_receiver_controller.cc b/webrtc/call/rtp_stream_receiver_controller.cc
new file mode 100644
index 0000000..a4b1e36
--- /dev/null
+++ b/webrtc/call/rtp_stream_receiver_controller.cc
@@ -0,0 +1,58 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/call/rtp_stream_receiver_controller.h"
+#include "webrtc/base/ptr_util.h"
+
+namespace webrtc {
+
+RtpStreamReceiverController::Receiver::Receiver(
+    RtpStreamReceiverController* controller,
+    uint32_t ssrc,
+    RtpPacketSinkInterface* sink)
+    : controller_(controller), sink_(sink) {
+  controller_->AddSink(ssrc, sink_);
+}
+
+RtpStreamReceiverController::Receiver::~Receiver() {
+  // Don't require return value > 0, since for RTX we currently may
+  // have multiple Receiver objects with the same sink.
+  // TODO(nisse): Consider adding a DCHECK when RtxReceiveStream is wired up.
+  controller_->RemoveSink(sink_);
+}
+
+RtpStreamReceiverController::RtpStreamReceiverController() = default;
+RtpStreamReceiverController::~RtpStreamReceiverController() = default;
+
+std::unique_ptr<RtpStreamReceiverInterface>
+RtpStreamReceiverController::CreateReceiver(
+    uint32_t ssrc,
+    RtpPacketSinkInterface* sink) {
+  return rtc::MakeUnique<Receiver>(this, ssrc, sink);
+}
+
+bool RtpStreamReceiverController::OnRtpPacket(const RtpPacketReceived& packet) {
+  rtc::CritScope cs(&lock_);
+  return demuxer_.OnRtpPacket(packet);
+}
+
+void RtpStreamReceiverController::AddSink(uint32_t ssrc,
+                                          RtpPacketSinkInterface* sink) {
+  rtc::CritScope cs(&lock_);
+  return demuxer_.AddSink(ssrc, sink);
+}
+
+size_t RtpStreamReceiverController::RemoveSink(
+    const RtpPacketSinkInterface* sink) {
+  rtc::CritScope cs(&lock_);
+  return demuxer_.RemoveSink(sink);
+}
+
+}  // namespace webrtc
diff --git a/webrtc/call/rtp_stream_receiver_controller.h b/webrtc/call/rtp_stream_receiver_controller.h
new file mode 100644
index 0000000..5c8ed67
--- /dev/null
+++ b/webrtc/call/rtp_stream_receiver_controller.h
@@ -0,0 +1,72 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_H_
+#define WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_H_
+
+#include <memory>
+
+#include "webrtc/base/criticalsection.h"
+#include "webrtc/call/rtp_demuxer.h"
+#include "webrtc/call/rtp_stream_receiver_controller_interface.h"
+
+namespace webrtc {
+
+class RtpPacketReceived;
+
+// This class represents the RTP receive parsing and demuxing, for a
+// single RTP session.
+// TODO(nisse): Add RTCP processing, we should aim to terminate RTCP
+// and not leave any RTCP processing to individual receive streams.
+// TODO(nisse): Extract per-packet processing, including parsing and
+// demuxing, into a separate class.
+class RtpStreamReceiverController
+    : public RtpStreamReceiverControllerInterface {
+ public:
+  RtpStreamReceiverController();
+  ~RtpStreamReceiverController() override;
+
+  // Implements RtpStreamReceiverControllerInterface.
+  std::unique_ptr<RtpStreamReceiverInterface> CreateReceiver(
+      uint32_t ssrc,
+      RtpPacketSinkInterface* sink) override;
+
+  // Thread-safe wrappers for the corresponding RtpDemuxer methods.
+  void AddSink(uint32_t ssrc, RtpPacketSinkInterface* sink) override;
+  size_t RemoveSink(const RtpPacketSinkInterface* sink) override;
+
+  // TODO(nisse): Not yet responsible for parsing.
+  bool OnRtpPacket(const RtpPacketReceived& packet);
+
+ private:
+  class Receiver : public RtpStreamReceiverInterface {
+   public:
+    Receiver(RtpStreamReceiverController* controller,
+             uint32_t ssrc,
+             RtpPacketSinkInterface* sink);
+
+    ~Receiver() override;
+
+   private:
+    RtpStreamReceiverController* const controller_;
+    RtpPacketSinkInterface* const sink_;
+  };
+
+  // TODO(nisse): Move to a TaskQueue for synchronization. When used
+  // by Call, we expect construction and all methods but OnRtpPacket
+  // to be called on the same thread, and OnRtpPacket to be called
+  // by a single, but possibly distinct, thread. But applications not
+  // using Call may have use threads differently.
+  rtc::CriticalSection lock_;
+  RtpDemuxer demuxer_ GUARDED_BY(&lock_);
+};
+
+}  // namespace webrtc
+
+#endif  // WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_H_
diff --git a/webrtc/call/rtp_stream_receiver_controller_interface.h b/webrtc/call/rtp_stream_receiver_controller_interface.h
new file mode 100644
index 0000000..51d25a5
--- /dev/null
+++ b/webrtc/call/rtp_stream_receiver_controller_interface.h
@@ -0,0 +1,47 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_INTERFACE_H_
+#define WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_INTERFACE_H_
+
+#include <memory>
+
+#include "webrtc/call/rtp_packet_sink_interface.h"
+
+namespace webrtc {
+
+// An RtpStreamReceiver is responsible for the rtp-specific but
+// media-independent state needed for receiving an RTP stream.
+// TODO(nisse): Currently, only owns the association between ssrc and
+// the stream's RtpPacketSinkInterface. Ownership of corresponding
+// objects from modules/rtp_rtcp/ should move to this class (or
+// rather, the corresponding implementation class). We should add
+// methods for getting rtp receive stats, and for sending RTCP
+// messages related to the receive stream.
+class RtpStreamReceiverInterface {
+ public:
+  virtual ~RtpStreamReceiverInterface() {}
+};
+
+// This class acts as a factory for RtpStreamReceiver objects.
+class RtpStreamReceiverControllerInterface {
+ public:
+  virtual ~RtpStreamReceiverControllerInterface() {}
+
+  virtual std::unique_ptr<RtpStreamReceiverInterface> CreateReceiver(
+      uint32_t ssrc,
+      RtpPacketSinkInterface* sink) = 0;
+  // For registering additional sinks, needed for FlexFEC.
+  virtual void AddSink(uint32_t ssrc, RtpPacketSinkInterface* sink) = 0;
+  virtual size_t RemoveSink(const RtpPacketSinkInterface* sink) = 0;
+};
+
+}  // namespace webrtc
+
+#endif  // WEBRTC_CALL_RTP_STREAM_RECEIVER_CONTROLLER_INTERFACE_H_
diff --git a/webrtc/video/BUILD.gn b/webrtc/video/BUILD.gn
index da31bca..58f58dc 100644
--- a/webrtc/video/BUILD.gn
+++ b/webrtc/video/BUILD.gn
@@ -260,6 +260,7 @@
       "../base:rtc_base_approved",
       "../base:rtc_base_tests_utils",
       "../call:call_interfaces",
+      "../call:rtp_receiver",
       "../common_video",
       "../logging:rtc_event_log_api",
       "../media:rtc_media",
diff --git a/webrtc/video/rtp_video_stream_receiver.h b/webrtc/video/rtp_video_stream_receiver.h
index 736f513..924f1be 100644
--- a/webrtc/video/rtp_video_stream_receiver.h
+++ b/webrtc/video/rtp_video_stream_receiver.h
@@ -19,6 +19,7 @@
 
 #include "webrtc/base/constructormagic.h"
 #include "webrtc/base/criticalsection.h"
+#include "webrtc/call/rtp_packet_sink_interface.h"
 #include "webrtc/modules/include/module_common_types.h"
 #include "webrtc/modules/rtp_rtcp/include/receive_statistics.h"
 #include "webrtc/modules/rtp_rtcp/include/remote_ntp_time_estimator.h"
@@ -58,6 +59,7 @@
 class RtpVideoStreamReceiver : public RtpData,
                                public RecoveredPacketReceiver,
                                public RtpFeedback,
+                               public RtpPacketSinkInterface,
                                public VCMFrameTypeCallback,
                                public VCMPacketRequestCallback,
                                public video_coding::OnReceivedFrameCallback,
@@ -96,8 +98,8 @@
 
   void SignalNetworkState(NetworkState state);
 
-  // TODO(nisse): Intended to be part of an RtpPacketReceiver interface.
-  void OnRtpPacket(const RtpPacketReceived& packet);
+  // Implements RtpPacketSinkInterface.
+  void OnRtpPacket(const RtpPacketReceived& packet) override;
 
   // Implements RtpData.
   int32_t OnReceivedPayloadData(const uint8_t* payload_data,
diff --git a/webrtc/video/video_receive_stream.cc b/webrtc/video/video_receive_stream.cc
index 9eceb00..acc497b 100644
--- a/webrtc/video/video_receive_stream.cc
+++ b/webrtc/video/video_receive_stream.cc
@@ -21,6 +21,7 @@
 #include "webrtc/base/logging.h"
 #include "webrtc/base/optional.h"
 #include "webrtc/base/trace_event.h"
+#include "webrtc/call/rtp_stream_receiver_controller_interface.h"
 #include "webrtc/common_types.h"
 #include "webrtc/common_video/h264/profile_level_id.h"
 #include "webrtc/common_video/libyuv/include/webrtc_libyuv.h"
@@ -168,11 +169,13 @@
 
 namespace internal {
 
-VideoReceiveStream::VideoReceiveStream(int num_cpu_cores,
-                                       PacketRouter* packet_router,
-                                       VideoReceiveStream::Config config,
-                                       ProcessThread* process_thread,
-                                       CallStats* call_stats)
+VideoReceiveStream::VideoReceiveStream(
+    RtpStreamReceiverControllerInterface* receiver_controller,
+    int num_cpu_cores,
+    PacketRouter* packet_router,
+    VideoReceiveStream::Config config,
+    ProcessThread* process_thread,
+    CallStats* call_stats)
     : transport_adapter_(config.rtcp_send_transport),
       config_(std::move(config)),
       num_cpu_cores_(num_cpu_cores),
@@ -222,6 +225,14 @@
       clock_, jitter_estimator_.get(), timing_.get(), &stats_proxy_));
 
   process_thread_->RegisterModule(&rtp_stream_sync_, RTC_FROM_HERE);
+
+  // Register with RtpStreamReceiverController.
+  media_receiver_ = receiver_controller->CreateReceiver(
+      config_.rtp.remote_ssrc, &rtp_video_stream_receiver_);
+  if (config.rtp.rtx_ssrc) {
+    rtx_receiver_ = receiver_controller->CreateReceiver(
+        config_.rtp.rtx_ssrc, &rtp_video_stream_receiver_);
+  }
 }
 
 VideoReceiveStream::~VideoReceiveStream() {
@@ -241,10 +252,6 @@
   return rtp_video_stream_receiver_.DeliverRtcp(packet, length);
 }
 
-void VideoReceiveStream::OnRtpPacket(const RtpPacketReceived& packet) {
-  rtp_video_stream_receiver_.OnRtpPacket(packet);
-}
-
 void VideoReceiveStream::SetSync(Syncable* audio_syncable) {
   RTC_DCHECK_RUN_ON(&worker_thread_checker_);
   rtp_stream_sync_.ConfigureSync(audio_syncable);
diff --git a/webrtc/video/video_receive_stream.h b/webrtc/video/video_receive_stream.h
index 6020d3f..a32573d 100644
--- a/webrtc/video/video_receive_stream.h
+++ b/webrtc/video/video_receive_stream.h
@@ -36,6 +36,8 @@
 class IvfFileWriter;
 class ProcessThread;
 class RTPFragmentationHeader;
+class RtpStreamReceiverInterface;
+class RtpStreamReceiverControllerInterface;
 class VCMTiming;
 class VCMJitterEstimator;
 
@@ -47,10 +49,10 @@
                            public NackSender,
                            public KeyFrameRequestSender,
                            public video_coding::OnCompleteFrameCallback,
-                           public Syncable,
-                           public RtpPacketSinkInterface {
+                           public Syncable {
  public:
-  VideoReceiveStream(int num_cpu_cores,
+  VideoReceiveStream(RtpStreamReceiverControllerInterface* receiver_controller,
+                     int num_cpu_cores,
                      PacketRouter* packet_router,
                      VideoReceiveStream::Config config,
                      ProcessThread* process_thread,
@@ -78,9 +80,6 @@
   void EnableEncodedFrameRecording(rtc::PlatformFile file,
                                    size_t byte_limit) override;
 
-  // RtpPacketSinkInterface.
-  void OnRtpPacket(const RtpPacketReceived& packet) override;
-
   // Implements rtc::VideoSinkInterface<VideoFrame>.
   void OnFrame(const VideoFrame& video_frame) override;
 
@@ -137,6 +136,9 @@
   // Members for the new jitter buffer experiment.
   std::unique_ptr<VCMJitterEstimator> jitter_estimator_;
   std::unique_ptr<video_coding::FrameBuffer> frame_buffer_;
+
+  std::unique_ptr<RtpStreamReceiverInterface> media_receiver_;
+  std::unique_ptr<RtpStreamReceiverInterface> rtx_receiver_;
 };
 }  // namespace internal
 }  // namespace webrtc
diff --git a/webrtc/video/video_receive_stream_unittest.cc b/webrtc/video/video_receive_stream_unittest.cc
index 237eed4..a1b644e 100644
--- a/webrtc/video/video_receive_stream_unittest.cc
+++ b/webrtc/video/video_receive_stream_unittest.cc
@@ -16,6 +16,7 @@
 #include "webrtc/api/video_codecs/video_decoder.h"
 #include "webrtc/base/criticalsection.h"
 #include "webrtc/base/event.h"
+#include "webrtc/call/rtp_stream_receiver_controller.h"
 #include "webrtc/media/base/fakevideorenderer.h"
 #include "webrtc/modules/pacing/packet_router.h"
 #include "webrtc/modules/rtp_rtcp/source/rtp_packet_to_send.h"
@@ -25,15 +26,14 @@
 #include "webrtc/system_wrappers/include/clock.h"
 #include "webrtc/test/field_trial.h"
 
+namespace webrtc {
+namespace {
+
 using testing::_;
 using testing::Invoke;
 
 constexpr int kDefaultTimeOutMs = 50;
 
-namespace webrtc {
-
-namespace {
-
 const char kNewJitterBufferFieldTrialEnabled[] =
     "WebRTC-NewVideoJitterBuffer/Enabled/";
 
@@ -91,7 +91,7 @@
     config_.decoders.push_back(null_decoder);
 
     video_receive_stream_.reset(new webrtc::internal::VideoReceiveStream(
-        kDefaultNumCpuCores,
+        &rtp_stream_receiver_controller_, kDefaultNumCpuCores,
         &packet_router_, config_.Copy(), process_thread_.get(), &call_stats_));
   }
 
@@ -105,6 +105,7 @@
   MockTransport mock_transport_;
   PacketRouter packet_router_;
   std::unique_ptr<ProcessThread> process_thread_;
+  RtpStreamReceiverController rtp_stream_receiver_controller_;
   std::unique_ptr<webrtc::internal::VideoReceiveStream> video_receive_stream_;
 };
 
@@ -130,9 +131,10 @@
   EXPECT_CALL(mock_h264_video_decoder_, Decode(_, false, _, _, _));
   RtpPacketReceived parsed_packet;
   ASSERT_TRUE(parsed_packet.Parse(rtppacket.data(), rtppacket.size()));
-  video_receive_stream_->OnRtpPacket(parsed_packet);
+  rtp_stream_receiver_controller_.OnRtpPacket(parsed_packet);
   EXPECT_CALL(mock_h264_video_decoder_, Release());
   // Make sure the decoder thread had a chance to run.
   init_decode_event_.Wait(kDefaultTimeOutMs);
 }
+
 }  // namespace webrtc
diff --git a/webrtc/voice_engine/channel_proxy.h b/webrtc/voice_engine/channel_proxy.h
index 35d0df5..f5417f2 100644
--- a/webrtc/voice_engine/channel_proxy.h
+++ b/webrtc/voice_engine/channel_proxy.h
@@ -17,6 +17,7 @@
 #include "webrtc/base/constructormagic.h"
 #include "webrtc/base/race_checker.h"
 #include "webrtc/base/thread_checker.h"
+#include "webrtc/call/rtp_packet_sink_interface.h"
 #include "webrtc/voice_engine/channel_manager.h"
 #include "webrtc/voice_engine/include/voe_rtp_rtcp.h"
 
@@ -50,7 +51,7 @@
 //     voe::Channel class.
 //  2. Provide a refined interface for the stream classes, including assumptions
 //     on return values and input adaptation.
-class ChannelProxy {
+class ChannelProxy : public RtpPacketSinkInterface {
  public:
   ChannelProxy();
   explicit ChannelProxy(const ChannelOwner& channel_owner);
@@ -94,7 +95,9 @@
   virtual void SetInputMute(bool muted);
   virtual void RegisterExternalTransport(Transport* transport);
   virtual void DeRegisterExternalTransport();
-  virtual void OnRtpPacket(const RtpPacketReceived& packet);
+
+  // Implements RtpPacketSinkInterface
+  void OnRtpPacket(const RtpPacketReceived& packet) override;
   virtual bool ReceivedRTCPPacket(const uint8_t* packet, size_t length);
   virtual const rtc::scoped_refptr<AudioDecoderFactory>&
       GetAudioDecoderFactory() const;