Create RtcpDemuxer. Capabilities:
1. Demux RTCP messages according to the sender-SSRC.
2. Demux RTCP messages according to the RSID (resolved to an SSRC, then compared to the sender-RTCP).
3. Allow listening in on all RTCP messages passing through the demuxer ("broadcast sinks").

BUG=webrtc:7135

Review-Url: https://codereview.webrtc.org/2943693003
Cr-Original-Commit-Position: refs/heads/master@{#18763}
Cr-Mirrored-From: https://chromium.googlesource.com/external/webrtc
Cr-Mirrored-Commit: cb83bdf01f2ec8b9ed254991edc2be053c9eed24
diff --git a/call/BUILD.gn b/call/BUILD.gn
index aa98053..8f0f38b 100644
--- a/call/BUILD.gn
+++ b/call/BUILD.gn
@@ -37,16 +37,25 @@
 # when interfaces have stabilized.
 rtc_source_set("rtp_interfaces") {
   sources = [
+    "rtcp_packet_sink_interface.h",
     "rtp_packet_sink_interface.h",
     "rtp_stream_receiver_controller_interface.h",
     "rtp_transport_controller_send_interface.h",
   ]
+  deps = [
+    "//webrtc/base:rtc_base_approved",
+  ]
 }
 
 rtc_source_set("rtp_receiver") {
   sources = [
+    "rsid_resolution_observer.h",
+    "rtcp_demuxer.cc",
+    "rtcp_demuxer.h",
     "rtp_demuxer.cc",
     "rtp_demuxer.h",
+    "rtp_rtcp_demuxer_helper.cc",
+    "rtp_rtcp_demuxer_helper.h",
     "rtp_stream_receiver_controller.cc",
     "rtp_stream_receiver_controller.h",
     "rtx_receive_stream.cc",
@@ -54,8 +63,9 @@
   ]
   deps = [
     ":rtp_interfaces",
-    "../base:rtc_base_approved",
     "../modules/rtp_rtcp",
+    "//webrtc:webrtc_common",
+    "//webrtc/base:rtc_base_approved",
   ]
 }
 
@@ -127,7 +137,9 @@
       "bitrate_estimator_tests.cc",
       "call_unittest.cc",
       "flexfec_receive_stream_unittest.cc",
+      "rtcp_demuxer_unittest.cc",
       "rtp_demuxer_unittest.cc",
+      "rtp_rtcp_demuxer_helper_unittest.cc",
       "rtx_receive_stream_unittest.cc",
     ]
     deps = [
@@ -153,6 +165,7 @@
       "../test:video_test_common",
       "//testing/gmock",
       "//testing/gtest",
+      "//webrtc:webrtc_common",
     ]
     if (!build_with_chromium && is_clang) {
       # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163).
diff --git a/call/rsid_resolution_observer.h b/call/rsid_resolution_observer.h
new file mode 100644
index 0000000..b14fa9d
--- /dev/null
+++ b/call/rsid_resolution_observer.h
@@ -0,0 +1,30 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef WEBRTC_CALL_RSID_RESOLUTION_OBSERVER_H_
+#define WEBRTC_CALL_RSID_RESOLUTION_OBSERVER_H_
+
+#include <string>
+
+#include "webrtc/base/basictypes.h"
+
+namespace webrtc {
+
+// One RSID can be associated with one, and only one, SSRC, throughout a call.
+// The resolution might either happen during call setup, or during the call.
+class RsidResolutionObserver {
+ public:
+  virtual ~RsidResolutionObserver() = default;
+
+  virtual void OnRsidResolved(const std::string& rsid, uint32_t ssrc) = 0;
+};
+
+}  // namespace webrtc
+
+#endif  // WEBRTC_CALL_RSID_RESOLUTION_OBSERVER_H_
diff --git a/call/rtcp_demuxer.cc b/call/rtcp_demuxer.cc
new file mode 100644
index 0000000..6054fc0
--- /dev/null
+++ b/call/rtcp_demuxer.cc
@@ -0,0 +1,100 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/call/rtcp_demuxer.h"
+
+#include "webrtc/base/checks.h"
+#include "webrtc/call/rtcp_packet_sink_interface.h"
+#include "webrtc/call/rtp_rtcp_demuxer_helper.h"
+#include "webrtc/common_types.h"
+
+namespace webrtc {
+
+RtcpDemuxer::RtcpDemuxer() = default;
+
+RtcpDemuxer::~RtcpDemuxer() {
+  RTC_DCHECK(ssrc_sinks_.empty());
+  RTC_DCHECK(rsid_sinks_.empty());
+  RTC_DCHECK(broadcast_sinks_.empty());
+}
+
+void RtcpDemuxer::AddSink(uint32_t sender_ssrc, RtcpPacketSinkInterface* sink) {
+  RTC_DCHECK(sink);
+  RTC_DCHECK(!ContainerHasKey(broadcast_sinks_, sink));
+  RTC_DCHECK(!MultimapAssociationExists(ssrc_sinks_, sender_ssrc, sink));
+  ssrc_sinks_.emplace(sender_ssrc, sink);
+}
+
+void RtcpDemuxer::AddSink(const std::string& rsid,
+                          RtcpPacketSinkInterface* sink) {
+  RTC_DCHECK(StreamId::IsLegalName(rsid));
+  RTC_DCHECK(sink);
+  RTC_DCHECK(!ContainerHasKey(broadcast_sinks_, sink));
+  RTC_DCHECK(!MultimapAssociationExists(rsid_sinks_, rsid, sink));
+  rsid_sinks_.emplace(rsid, sink);
+}
+
+void RtcpDemuxer::AddBroadcastSink(RtcpPacketSinkInterface* sink) {
+  RTC_DCHECK(sink);
+  RTC_DCHECK(!MultimapHasValue(ssrc_sinks_, sink));
+  RTC_DCHECK(!MultimapHasValue(rsid_sinks_, sink));
+  RTC_DCHECK(!ContainerHasKey(broadcast_sinks_, sink));
+  broadcast_sinks_.push_back(sink);
+}
+
+void RtcpDemuxer::RemoveSink(const RtcpPacketSinkInterface* sink) {
+  RTC_DCHECK(sink);
+  size_t removal_count = RemoveFromMultimapByValue(&ssrc_sinks_, sink) +
+                         RemoveFromMultimapByValue(&rsid_sinks_, sink);
+  RTC_DCHECK_GT(removal_count, 0);
+}
+
+void RtcpDemuxer::RemoveBroadcastSink(const RtcpPacketSinkInterface* sink) {
+  RTC_DCHECK(sink);
+  auto it = std::find(broadcast_sinks_.begin(), broadcast_sinks_.end(), sink);
+  RTC_DCHECK(it != broadcast_sinks_.end());
+  broadcast_sinks_.erase(it);
+}
+
+void RtcpDemuxer::OnRtcpPacket(rtc::ArrayView<const uint8_t> packet) {
+  // Perform sender-SSRC-based demuxing for packets with a sender-SSRC.
+  rtc::Optional<uint32_t> sender_ssrc = ParseRtcpPacketSenderSsrc(packet);
+  if (sender_ssrc) {
+    auto it_range = ssrc_sinks_.equal_range(*sender_ssrc);
+    for (auto it = it_range.first; it != it_range.second; ++it) {
+      it->second->OnRtcpPacket(packet);
+    }
+  }
+
+  // All packets, even those without a sender-SSRC, are broadcast to sinks
+  // which listen to broadcasts.
+  for (RtcpPacketSinkInterface* sink : broadcast_sinks_) {
+    sink->OnRtcpPacket(packet);
+  }
+}
+
+void RtcpDemuxer::OnRsidResolved(const std::string& rsid, uint32_t ssrc) {
+  // Record the new SSRC association for all of the sinks that were associated
+  // with the RSID.
+  auto it_range = rsid_sinks_.equal_range(rsid);
+  for (auto it = it_range.first; it != it_range.second; ++it) {
+    RtcpPacketSinkInterface* sink = it->second;
+    // Watch out for pre-existing SSRC-based associations.
+    if (!MultimapAssociationExists(ssrc_sinks_, ssrc, sink)) {
+      AddSink(ssrc, sink);
+    }
+  }
+
+  // RSIDs are uniquely associated with SSRCs; no need to keep in memory
+  // the RSID-to-sink association of resolved RSIDs.
+  rsid_sinks_.erase(it_range.first, it_range.second);
+}
+
+}  // namespace webrtc
diff --git a/call/rtcp_demuxer.h b/call/rtcp_demuxer.h
new file mode 100644
index 0000000..c5c1621
--- /dev/null
+++ b/call/rtcp_demuxer.h
@@ -0,0 +1,85 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef WEBRTC_CALL_RTCP_DEMUXER_H_
+#define WEBRTC_CALL_RTCP_DEMUXER_H_
+
+#include <map>
+#include <string>
+#include <vector>
+
+#include "webrtc/base/array_view.h"
+#include "webrtc/base/basictypes.h"
+#include "webrtc/call/rsid_resolution_observer.h"
+
+namespace webrtc {
+
+class RtcpPacketSinkInterface;
+
+// This class represents the RTCP demuxing, for a single RTP session (i.e., one
+// SSRC space, see RFC 7656). It isn't thread aware, leaving responsibility of
+// multithreading issues to the user of this class.
+class RtcpDemuxer : public RsidResolutionObserver {
+ public:
+  RtcpDemuxer();
+  ~RtcpDemuxer() override;
+
+  // Registers a sink. The sink will be notified of incoming RTCP packets with
+  // that sender-SSRC. The same sink can be registered for multiple SSRCs, and
+  // the same SSRC can have multiple sinks. Null pointer is not allowed.
+  // Sinks may be associated with both an SSRC and an RSID.
+  // Sinks may be registered as SSRC/RSID-specific or broadcast, but not both.
+  void AddSink(uint32_t sender_ssrc, RtcpPacketSinkInterface* sink);
+
+  // Registers a sink. Once the RSID is resolved to an SSRC, the sink will be
+  // notified of all RTCP packets with that sender-SSRC.
+  // The same sink can be registered for multiple RSIDs, and
+  // the same RSID can have multiple sinks. Null pointer is not allowed.
+  // Sinks may be associated with both an SSRC and an RSID.
+  // Sinks may be registered as SSRC/RSID-specific or broadcast, but not both.
+  void AddSink(const std::string& rsid, RtcpPacketSinkInterface* sink);
+
+  // Registers a sink. The sink will be notified of any incoming RTCP packet.
+  // Null pointer is not allowed.
+  // Sinks may be registered as SSRC/RSID-specific or broadcast, but not both.
+  void AddBroadcastSink(RtcpPacketSinkInterface* sink);
+
+  // Undo previous AddSink() calls with the given sink.
+  void RemoveSink(const RtcpPacketSinkInterface* sink);
+
+  // Undo AddBroadcastSink().
+  void RemoveBroadcastSink(const RtcpPacketSinkInterface* sink);
+
+  // Process a new RTCP packet and forward it to the appropriate sinks.
+  void OnRtcpPacket(rtc::ArrayView<const uint8_t> packet);
+
+  // Implement RsidResolutionObserver - become notified whenever RSIDs resolve
+  // to an SSRC.
+  void OnRsidResolved(const std::string& rsid, uint32_t ssrc) override;
+
+  // TODO(eladalon): Add the ability to resolve RSIDs and inform observers,
+  // like in the RtpDemuxer case, once the relevant standard is finalized.
+
+ private:
+  // Records the association SSRCs to sinks.
+  std::multimap<uint32_t, RtcpPacketSinkInterface*> ssrc_sinks_;
+
+  // Records the association RSIDs to sinks.
+  std::multimap<std::string, RtcpPacketSinkInterface*> rsid_sinks_;
+
+  // Sinks which will receive notifications of all incoming RTCP packets.
+  // Additional/removal of sinks is expected to be significantly less frequent
+  // than RTCP message reception; container chosen for iteration performance.
+  std::vector<RtcpPacketSinkInterface*> broadcast_sinks_;
+};
+
+}  // namespace webrtc
+
+#endif  // WEBRTC_CALL_RTCP_DEMUXER_H_
diff --git a/call/rtcp_demuxer_unittest.cc b/call/rtcp_demuxer_unittest.cc
new file mode 100644
index 0000000..49f037c
--- /dev/null
+++ b/call/rtcp_demuxer_unittest.cc
@@ -0,0 +1,584 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/call/rtcp_demuxer.h"
+
+#include <memory>
+
+#include "webrtc/base/arraysize.h"
+#include "webrtc/base/basictypes.h"
+#include "webrtc/base/checks.h"
+#include "webrtc/base/ptr_util.h"
+#include "webrtc/call/rtcp_packet_sink_interface.h"
+#include "webrtc/common_types.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/bye.h"
+#include "webrtc/test/gmock.h"
+#include "webrtc/test/gtest.h"
+
+namespace webrtc {
+
+namespace {
+
+using ::testing::_;
+using ::testing::AtLeast;
+using ::testing::ElementsAreArray;
+using ::testing::InSequence;
+using ::testing::NiceMock;
+
+class MockRtcpPacketSink : public RtcpPacketSinkInterface {
+ public:
+  MOCK_METHOD1(OnRtcpPacket, void(rtc::ArrayView<const uint8_t>));
+};
+
+// Produces a packet buffer representing an RTCP packet with a given SSRC,
+// as it would look when sent over the wire.
+// |distinguishing_string| allows different RTCP packets with the same SSRC
+// to be distinguished. How this is set into the actual packet is
+// unimportant, and depends on which RTCP message we choose to use.
+rtc::Buffer CreateRtcpPacket(uint32_t ssrc,
+                             const std::string& distinguishing_string = "") {
+  rtcp::Bye packet;
+  packet.SetSenderSsrc(ssrc);
+  if (distinguishing_string != "") {
+    // Actual way we use |distinguishing_string| is unimportant, so long
+    // as it ends up in the packet.
+    packet.SetReason(distinguishing_string);
+  }
+  return packet.Build();
+}
+
+}  // namespace
+
+TEST(RtcpDemuxerTest, OnRtcpPacketCalledOnCorrectSinkBySsrc) {
+  RtcpDemuxer demuxer;
+
+  constexpr uint32_t ssrcs[] = {101, 202, 303};
+  MockRtcpPacketSink sinks[arraysize(ssrcs)];
+  for (size_t i = 0; i < arraysize(ssrcs); i++) {
+    demuxer.AddSink(ssrcs[i], &sinks[i]);
+  }
+
+  for (size_t i = 0; i < arraysize(ssrcs); i++) {
+    auto packet = CreateRtcpPacket(ssrcs[i]);
+    EXPECT_CALL(sinks[i],
+                OnRtcpPacket(ElementsAreArray(packet.cbegin(), packet.cend())))
+        .Times(1);
+    demuxer.OnRtcpPacket(packet);
+  }
+
+  // Test tear-down
+  for (const auto& sink : sinks) {
+    demuxer.RemoveSink(&sink);
+  }
+}
+
+TEST(RtcpDemuxerTest, OnRtcpPacketCalledOnResolvedRsidSink) {
+  RtcpDemuxer demuxer;
+
+  // Set up some RSID sinks.
+  const std::string rsids[] = {"a", "b", "c"};
+  MockRtcpPacketSink sinks[arraysize(rsids)];
+  for (size_t i = 0; i < arraysize(rsids); i++) {
+    demuxer.AddSink(rsids[i], &sinks[i]);
+  }
+
+  // Only resolve one of the sinks.
+  constexpr size_t resolved_sink_index = 0;
+  constexpr uint32_t ssrc = 345;
+  demuxer.OnRsidResolved(rsids[resolved_sink_index], ssrc);
+
+  // The resolved sink gets notifications of RTCP messages with its SSRC.
+  auto packet = CreateRtcpPacket(ssrc);
+  EXPECT_CALL(sinks[resolved_sink_index],
+              OnRtcpPacket(ElementsAreArray(packet.cbegin(), packet.cend())))
+      .Times(1);
+
+  // RTCP received; expected calls triggered.
+  demuxer.OnRtcpPacket(packet);
+
+  // Test tear-down
+  for (const auto& sink : sinks) {
+    demuxer.RemoveSink(&sink);
+  }
+}
+
+TEST(RtcpDemuxerTest,
+     SingleCallbackAfterResolutionOfAnRsidToAlreadyRegisteredSsrc) {
+  RtcpDemuxer demuxer;
+
+  // Associate a sink with an SSRC.
+  MockRtcpPacketSink sink;
+  constexpr uint32_t ssrc = 999;
+  demuxer.AddSink(ssrc, &sink);
+
+  // Associate the same sink with an RSID.
+  const std::string rsid = "r";
+  demuxer.AddSink(rsid, &sink);
+
+  // Resolve the RSID to the aforementioned SSRC.
+  demuxer.OnRsidResolved(rsid, ssrc);
+
+  // OnRtcpPacket still called only a single time for messages with this SSRC.
+  auto packet = CreateRtcpPacket(ssrc);
+  EXPECT_CALL(sink,
+              OnRtcpPacket(ElementsAreArray(packet.cbegin(), packet.cend())))
+      .Times(1);
+  demuxer.OnRtcpPacket(packet);
+
+  // Test tear-down
+  demuxer.RemoveSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, OnRtcpPacketCalledOnAllBroadcastSinksForAllRtcpPackets) {
+  RtcpDemuxer demuxer;
+
+  MockRtcpPacketSink sinks[3];
+  for (MockRtcpPacketSink& sink : sinks) {
+    demuxer.AddBroadcastSink(&sink);
+  }
+
+  constexpr uint32_t ssrc = 747;
+  auto packet = CreateRtcpPacket(ssrc);
+
+  for (MockRtcpPacketSink& sink : sinks) {
+    EXPECT_CALL(sink,
+                OnRtcpPacket(ElementsAreArray(packet.cbegin(), packet.cend())))
+        .Times(1);
+  }
+
+  // RTCP received; expected calls triggered.
+  demuxer.OnRtcpPacket(packet);
+
+  // Test tear-down
+  for (const auto& sink : sinks) {
+    demuxer.RemoveBroadcastSink(&sink);
+  }
+}
+
+TEST(RtcpDemuxerTest, PacketsDeliveredInRightOrderToNonBroadcastSink) {
+  RtcpDemuxer demuxer;
+
+  constexpr uint32_t ssrc = 101;
+  MockRtcpPacketSink sink;
+  demuxer.AddSink(ssrc, &sink);
+
+  std::vector<rtc::Buffer> packets;
+  for (size_t i = 0; i < 5; i++) {
+    packets.push_back(CreateRtcpPacket(ssrc, std::to_string(i)));
+  }
+
+  InSequence sequence;
+  for (const auto& packet : packets) {
+    EXPECT_CALL(sink,
+                OnRtcpPacket(ElementsAreArray(packet.cbegin(), packet.cend())))
+        .Times(1);
+  }
+
+  for (const auto& packet : packets) {
+    demuxer.OnRtcpPacket(packet);
+  }
+
+  // Test tear-down
+  demuxer.RemoveSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, PacketsDeliveredInRightOrderToBroadcastSink) {
+  RtcpDemuxer demuxer;
+
+  MockRtcpPacketSink sink;
+  demuxer.AddBroadcastSink(&sink);
+
+  std::vector<rtc::Buffer> packets;
+  for (size_t i = 0; i < 5; i++) {
+    constexpr uint32_t ssrc = 101;
+    packets.push_back(CreateRtcpPacket(ssrc, std::to_string(i)));
+  }
+
+  InSequence sequence;
+  for (const auto& packet : packets) {
+    EXPECT_CALL(sink,
+                OnRtcpPacket(ElementsAreArray(packet.cbegin(), packet.cend())))
+        .Times(1);
+  }
+
+  for (const auto& packet : packets) {
+    demuxer.OnRtcpPacket(packet);
+  }
+
+  // Test tear-down
+  demuxer.RemoveBroadcastSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, MultipleSinksMappedToSameSsrc) {
+  RtcpDemuxer demuxer;
+
+  MockRtcpPacketSink sinks[3];
+  constexpr uint32_t ssrc = 404;
+  for (auto& sink : sinks) {
+    demuxer.AddSink(ssrc, &sink);
+  }
+
+  // Reception of an RTCP packet associated with the shared SSRC triggers the
+  // callback on all of the sinks associated with it.
+  auto packet = CreateRtcpPacket(ssrc);
+  for (auto& sink : sinks) {
+    EXPECT_CALL(sink,
+                OnRtcpPacket(ElementsAreArray(packet.cbegin(), packet.cend())));
+  }
+  demuxer.OnRtcpPacket(packet);
+
+  // Test tear-down
+  for (const auto& sink : sinks) {
+    demuxer.RemoveSink(&sink);
+  }
+}
+
+TEST(RtcpDemuxerTest, SinkMappedToMultipleSsrcs) {
+  RtcpDemuxer demuxer;
+
+  constexpr uint32_t ssrcs[] = {404, 505, 606};
+  MockRtcpPacketSink sink;
+  for (uint32_t ssrc : ssrcs) {
+    demuxer.AddSink(ssrc, &sink);
+  }
+
+  // The sink which is associated with multiple SSRCs gets the callback
+  // triggered for each of those SSRCs.
+  for (uint32_t ssrc : ssrcs) {
+    auto packet = CreateRtcpPacket(ssrc);
+    EXPECT_CALL(sink,
+                OnRtcpPacket(ElementsAreArray(packet.cbegin(), packet.cend())));
+    demuxer.OnRtcpPacket(packet);
+  }
+
+  // Test tear-down
+  demuxer.RemoveSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, MultipleRsidsOnSameSink) {
+  RtcpDemuxer demuxer;
+
+  // Sink associated with multiple sinks.
+  MockRtcpPacketSink sink;
+  const std::string rsids[] = {"a", "b", "c"};
+  for (const auto& rsid : rsids) {
+    demuxer.AddSink(rsid, &sink);
+  }
+
+  // RSIDs resolved to SSRCs.
+  uint32_t ssrcs[arraysize(rsids)];
+  for (size_t i = 0; i < arraysize(rsids); i++) {
+    ssrcs[i] = 1000 + static_cast<uint32_t>(i);
+    demuxer.OnRsidResolved(rsids[i], ssrcs[i]);
+  }
+
+  // Set up packets to match those RSIDs/SSRCs.
+  std::vector<rtc::Buffer> packets;
+  for (size_t i = 0; i < arraysize(rsids); i++) {
+    packets.push_back(CreateRtcpPacket(ssrcs[i]));
+  }
+
+  // The sink expects to receive all of the packets.
+  for (const auto& packet : packets) {
+    EXPECT_CALL(sink,
+                OnRtcpPacket(ElementsAreArray(packet.cbegin(), packet.cend())))
+        .Times(1);
+  }
+
+  // Packet demuxed correctly; OnRtcpPacket() triggered on sink.
+  for (const auto& packet : packets) {
+    demuxer.OnRtcpPacket(packet);
+  }
+
+  // Test tear-down
+  demuxer.RemoveSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, RsidUsedByMultipleSinks) {
+  RtcpDemuxer demuxer;
+
+  MockRtcpPacketSink sinks[3];
+  const std::string shared_rsid = "a";
+
+  for (MockRtcpPacketSink& sink : sinks) {
+    demuxer.AddSink(shared_rsid, &sink);
+  }
+
+  constexpr uint32_t shared_ssrc = 888;
+  demuxer.OnRsidResolved(shared_rsid, shared_ssrc);
+
+  auto packet = CreateRtcpPacket(shared_ssrc);
+
+  for (MockRtcpPacketSink& sink : sinks) {
+    EXPECT_CALL(sink,
+                OnRtcpPacket(ElementsAreArray(packet.cbegin(), packet.cend())))
+        .Times(1);
+  }
+
+  demuxer.OnRtcpPacket(packet);
+
+  // Test tear-down
+  for (MockRtcpPacketSink& sink : sinks) {
+    demuxer.RemoveSink(&sink);
+  }
+}
+
+TEST(RtcpDemuxerTest, NoCallbackOnSsrcSinkRemovedBeforeFirstPacket) {
+  RtcpDemuxer demuxer;
+
+  constexpr uint32_t ssrc = 404;
+  MockRtcpPacketSink sink;
+  demuxer.AddSink(ssrc, &sink);
+
+  demuxer.RemoveSink(&sink);
+
+  // The removed sink does not get callbacks.
+  auto packet = CreateRtcpPacket(ssrc);
+  EXPECT_CALL(sink, OnRtcpPacket(_)).Times(0);  // Not called.
+  demuxer.OnRtcpPacket(packet);
+}
+
+TEST(RtcpDemuxerTest, NoCallbackOnSsrcSinkRemovedAfterFirstPacket) {
+  RtcpDemuxer demuxer;
+
+  constexpr uint32_t ssrc = 404;
+  NiceMock<MockRtcpPacketSink> sink;
+  demuxer.AddSink(ssrc, &sink);
+
+  auto before_packet = CreateRtcpPacket(ssrc);
+  demuxer.OnRtcpPacket(before_packet);
+
+  demuxer.RemoveSink(&sink);
+
+  // The removed sink does not get callbacks.
+  auto after_packet = CreateRtcpPacket(ssrc);
+  EXPECT_CALL(sink, OnRtcpPacket(_)).Times(0);  // Not called.
+  demuxer.OnRtcpPacket(after_packet);
+}
+
+TEST(RtcpDemuxerTest, NoCallbackOnRsidSinkRemovedBeforeRsidResolution) {
+  RtcpDemuxer demuxer;
+
+  const std::string rsid = "a";
+  constexpr uint32_t ssrc = 404;
+  MockRtcpPacketSink sink;
+  demuxer.AddSink(rsid, &sink);
+
+  // Removal before resolution.
+  demuxer.RemoveSink(&sink);
+  demuxer.OnRsidResolved(rsid, ssrc);
+
+  // The removed sink does not get callbacks.
+  auto packet = CreateRtcpPacket(ssrc);
+  EXPECT_CALL(sink, OnRtcpPacket(_)).Times(0);  // Not called.
+  demuxer.OnRtcpPacket(packet);
+}
+
+TEST(RtcpDemuxerTest, NoCallbackOnRsidSinkRemovedAfterRsidResolution) {
+  RtcpDemuxer demuxer;
+
+  const std::string rsid = "a";
+  constexpr uint32_t ssrc = 404;
+  MockRtcpPacketSink sink;
+  demuxer.AddSink(rsid, &sink);
+
+  // Removal after resolution.
+  demuxer.OnRsidResolved(rsid, ssrc);
+  demuxer.RemoveSink(&sink);
+
+  // The removed sink does not get callbacks.
+  auto packet = CreateRtcpPacket(ssrc);
+  EXPECT_CALL(sink, OnRtcpPacket(_)).Times(0);  // Not called.
+  demuxer.OnRtcpPacket(packet);
+}
+
+TEST(RtcpDemuxerTest, NoCallbackOnBroadcastSinkRemovedBeforeFirstPacket) {
+  RtcpDemuxer demuxer;
+
+  MockRtcpPacketSink sink;
+  demuxer.AddBroadcastSink(&sink);
+
+  demuxer.RemoveBroadcastSink(&sink);
+
+  // The removed sink does not get callbacks.
+  constexpr uint32_t ssrc = 404;
+  auto packet = CreateRtcpPacket(ssrc);
+  EXPECT_CALL(sink, OnRtcpPacket(_)).Times(0);  // Not called.
+  demuxer.OnRtcpPacket(packet);
+}
+
+TEST(RtcpDemuxerTest, NoCallbackOnBroadcastSinkRemovedAfterFirstPacket) {
+  RtcpDemuxer demuxer;
+
+  NiceMock<MockRtcpPacketSink> sink;
+  demuxer.AddBroadcastSink(&sink);
+
+  constexpr uint32_t ssrc = 404;
+  auto before_packet = CreateRtcpPacket(ssrc);
+  demuxer.OnRtcpPacket(before_packet);
+
+  demuxer.RemoveBroadcastSink(&sink);
+
+  // The removed sink does not get callbacks.
+  auto after_packet = CreateRtcpPacket(ssrc);
+  EXPECT_CALL(sink, OnRtcpPacket(_)).Times(0);  // Not called.
+  demuxer.OnRtcpPacket(after_packet);
+}
+
+// The RSID to SSRC mapping should be one-to-one. If we end up receiving
+// two (or more) packets with the same SSRC, but different RSIDs, we guarantee
+// remembering the first one; no guarantees are made about further associations.
+TEST(RtcpDemuxerTest, FirstRsolutionOfRsidNotForgotten) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+
+  const std::string rsid = "a";
+  demuxer.AddSink(rsid, &sink);
+
+  constexpr uint32_t ssrc_a = 111;  // First resolution - guaranteed effective.
+  demuxer.OnRsidResolved(rsid, ssrc_a);
+
+  constexpr uint32_t ssrc_b = 222;  // Second resolution - no guarantees.
+  demuxer.OnRsidResolved(rsid, ssrc_b);
+
+  auto packet_a = CreateRtcpPacket(ssrc_a);
+  EXPECT_CALL(
+      sink, OnRtcpPacket(ElementsAreArray(packet_a.cbegin(), packet_a.cend())))
+      .Times(1);
+  demuxer.OnRtcpPacket(packet_a);
+
+  auto packet_b = CreateRtcpPacket(ssrc_b);
+  EXPECT_CALL(
+      sink, OnRtcpPacket(ElementsAreArray(packet_b.cbegin(), packet_b.cend())))
+      .Times(AtLeast(0));
+  demuxer.OnRtcpPacket(packet_b);
+
+  // Test tear-down
+  demuxer.RemoveSink(&sink);
+}
+
+#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
+TEST(RtcpDemuxerTest, RepeatedSsrcToSinkAssociationsDisallowed) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+
+  constexpr uint32_t ssrc = 101;
+  demuxer.AddSink(ssrc, &sink);
+  EXPECT_DEATH(demuxer.AddSink(ssrc, &sink), "");
+
+  // Test tear-down
+  demuxer.RemoveSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, RepeatedRsidToSinkAssociationsDisallowed) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+
+  const std::string rsid = "z";
+  demuxer.AddSink(rsid, &sink);
+  EXPECT_DEATH(demuxer.AddSink(rsid, &sink), "");
+
+  // Test tear-down
+  demuxer.RemoveSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, RepeatedBroadcastSinkRegistrationDisallowed) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+
+  demuxer.AddBroadcastSink(&sink);
+  EXPECT_DEATH(demuxer.AddBroadcastSink(&sink), "");
+
+  // Test tear-down
+  demuxer.RemoveBroadcastSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, SsrcSinkCannotAlsoBeRegisteredAsBroadcast) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+
+  constexpr uint32_t ssrc = 101;
+  demuxer.AddSink(ssrc, &sink);
+  EXPECT_DEATH(demuxer.AddBroadcastSink(&sink), "");
+
+  // Test tear-down
+  demuxer.RemoveSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, RsidSinkCannotAlsoBeRegisteredAsBroadcast) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+
+  const std::string rsid = "z";
+  demuxer.AddSink(rsid, &sink);
+  EXPECT_DEATH(demuxer.AddBroadcastSink(&sink), "");
+
+  // Test tear-down
+  demuxer.RemoveSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, BroadcastSinkCannotAlsoBeRegisteredAsSsrcSink) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+
+  demuxer.AddBroadcastSink(&sink);
+  constexpr uint32_t ssrc = 101;
+  EXPECT_DEATH(demuxer.AddSink(ssrc, &sink), "");
+
+  // Test tear-down
+  demuxer.RemoveBroadcastSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, BroadcastSinkCannotAlsoBeRegisteredAsRsidSink) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+
+  demuxer.AddBroadcastSink(&sink);
+  const std::string rsid = "j";
+  EXPECT_DEATH(demuxer.AddSink(rsid, &sink), "");
+
+  // Test tear-down
+  demuxer.RemoveBroadcastSink(&sink);
+}
+
+TEST(RtcpDemuxerTest, MayNotCallRemoveSinkOnNeverAddedSink) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+
+  EXPECT_DEATH(demuxer.RemoveSink(&sink), "");
+}
+
+TEST(RtcpDemuxerTest, MayNotCallRemoveBroadcastSinkOnNeverAddedSink) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+
+  EXPECT_DEATH(demuxer.RemoveBroadcastSink(&sink), "");
+}
+
+TEST(RtcpDemuxerTest, RsidMustBeNonEmpty) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+  EXPECT_DEATH(demuxer.AddSink("", &sink), "");
+}
+
+TEST(RtcpDemuxerTest, RsidMustBeAlphaNumeric) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+  EXPECT_DEATH(demuxer.AddSink("a_3", &sink), "");
+}
+
+TEST(RtcpDemuxerTest, RsidMustNotExceedMaximumLength) {
+  RtcpDemuxer demuxer;
+  MockRtcpPacketSink sink;
+  std::string rsid(StreamId::kMaxSize + 1, 'a');
+  EXPECT_DEATH(demuxer.AddSink(rsid, &sink), "");
+}
+#endif
+}  // namespace webrtc
diff --git a/call/rtcp_packet_sink_interface.h b/call/rtcp_packet_sink_interface.h
new file mode 100644
index 0000000..e26bd37
--- /dev/null
+++ b/call/rtcp_packet_sink_interface.h
@@ -0,0 +1,29 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef WEBRTC_CALL_RTCP_PACKET_SINK_INTERFACE_H_
+#define WEBRTC_CALL_RTCP_PACKET_SINK_INTERFACE_H_
+
+#include "webrtc/base/array_view.h"
+
+namespace webrtc {
+
+// This class represents a receiver of unparsed RTCP packets.
+// TODO(eladalon): Replace this by demuxing over parsed rather than raw data.
+// Whether this should be over an entire RTCP packet, or over RTCP blocks,
+// is still under discussion.
+class RtcpPacketSinkInterface {
+ public:
+  virtual ~RtcpPacketSinkInterface() = default;
+  virtual void OnRtcpPacket(rtc::ArrayView<const uint8_t> packet) = 0;
+};
+
+}  // namespace webrtc
+
+#endif  // WEBRTC_CALL_RTCP_PACKET_SINK_INTERFACE_H_
diff --git a/call/rtp_demuxer.cc b/call/rtp_demuxer.cc
index 620d4b1..e1957ed 100644
--- a/call/rtp_demuxer.cc
+++ b/call/rtp_demuxer.cc
@@ -12,50 +12,27 @@
 
 #include "webrtc/base/checks.h"
 #include "webrtc/base/logging.h"
+#include "webrtc/call/rsid_resolution_observer.h"
 #include "webrtc/call/rtp_packet_sink_interface.h"
+#include "webrtc/call/rtp_rtcp_demuxer_helper.h"
 #include "webrtc/modules/rtp_rtcp/source/rtp_header_extensions.h"
 #include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h"
 
 namespace webrtc {
 
 namespace {
-
 constexpr size_t kMaxProcessedSsrcs = 1000;  // Prevent memory overuse.
-
-template <typename Key, typename Value>
-bool MultimapAssociationExists(const std::multimap<Key, Value>& multimap,
-                               Key key,
-                               Value val) {
-  auto it_range = multimap.equal_range(key);
-  using Reference = typename std::multimap<Key, Value>::const_reference;
-  return std::any_of(it_range.first, it_range.second,
-                     [val](Reference elem) { return elem.second == val; });
-}
-
-template <typename Key, typename Value>
-size_t RemoveFromMultimapByValue(std::multimap<Key, Value*>* multimap,
-                                 const Value* value) {
-  size_t count = 0;
-  for (auto it = multimap->begin(); it != multimap->end();) {
-    if (it->second == value) {
-      it = multimap->erase(it);
-      ++count;
-    } else {
-      ++it;
-    }
-  }
-  return count;
-}
-
 }  // namespace
 
-RtpDemuxer::RtpDemuxer() {}
+RtpDemuxer::RtpDemuxer() = default;
 
 RtpDemuxer::~RtpDemuxer() {
   RTC_DCHECK(sinks_.empty());
+  RTC_DCHECK(rsid_sinks_.empty());
 }
 
 void RtpDemuxer::AddSink(uint32_t ssrc, RtpPacketSinkInterface* sink) {
+  RTC_DCHECK(sink);
   RecordSsrcToSinkAssociation(ssrc, sink);
 }
 
@@ -88,7 +65,7 @@
 }
 
 bool RtpDemuxer::OnRtpPacket(const RtpPacketReceived& packet) {
-  FindSsrcAssociations(packet);
+  ResolveAssociations(packet);
   auto it_range = sinks_.equal_range(packet.Ssrc());
   for (auto it = it_range.first; it != it_range.second; ++it) {
     it->second->OnRtpPacket(packet);
@@ -96,14 +73,45 @@
   return it_range.first != it_range.second;
 }
 
-void RtpDemuxer::FindSsrcAssociations(const RtpPacketReceived& packet) {
+void RtpDemuxer::RegisterRsidResolutionObserver(
+    RsidResolutionObserver* observer) {
+  RTC_DCHECK(observer);
+  RTC_DCHECK(!ContainerHasKey(rsid_resolution_observers_, observer));
+
+  rsid_resolution_observers_.push_back(observer);
+
+  processed_ssrcs_.clear();  // New observer requires new notifications.
+}
+
+void RtpDemuxer::DeregisterRsidResolutionObserver(
+    const RsidResolutionObserver* observer) {
+  RTC_DCHECK(observer);
+  auto it = std::find(rsid_resolution_observers_.begin(),
+                      rsid_resolution_observers_.end(), observer);
+  RTC_DCHECK(it != rsid_resolution_observers_.end());
+  rsid_resolution_observers_.erase(it);
+}
+
+void RtpDemuxer::ResolveAssociations(const RtpPacketReceived& packet) {
   // Avoid expensive string comparisons for RSID by looking the sinks up only
   // by SSRC whenever possible.
   if (processed_ssrcs_.find(packet.Ssrc()) != processed_ssrcs_.cend()) {
     return;
   }
 
-  // RSID-based associations:
+  ResolveRsidToSsrcAssociations(packet);
+
+  if (processed_ssrcs_.size() < kMaxProcessedSsrcs) {  // Prevent memory overuse
+    processed_ssrcs_.insert(packet.Ssrc());  // Avoid re-examining in-depth.
+  } else if (!logged_max_processed_ssrcs_exceeded_) {
+    LOG(LS_WARNING) << "More than " << kMaxProcessedSsrcs
+                    << " different SSRCs seen.";
+    logged_max_processed_ssrcs_exceeded_ = true;
+  }
+}
+
+void RtpDemuxer::ResolveRsidToSsrcAssociations(
+    const RtpPacketReceived& packet) {
   std::string rsid;
   if (packet.GetExtension<RtpStreamId>(&rsid)) {
     // All streams associated with this RSID need to be marked as associated
@@ -113,17 +121,18 @@
       RecordSsrcToSinkAssociation(packet.Ssrc(), it->second);
     }
 
+    NotifyObserversOfRsidResolution(rsid, packet.Ssrc());
+
     // To prevent memory-overuse attacks, forget this RSID. Future packets
     // with this RSID, but a different SSRC, will not spawn new associations.
     rsid_sinks_.erase(it_range.first, it_range.second);
   }
+}
 
-  if (processed_ssrcs_.size() < kMaxProcessedSsrcs) {  // Prevent memory overuse
-    processed_ssrcs_.insert(packet.Ssrc());  // Avoid re-examining in-depth.
-  } else if (!logged_max_processed_ssrcs_exceeded_) {
-    LOG(LS_WARNING) << "More than " << kMaxProcessedSsrcs
-                    << " different SSRCs seen.";
-    logged_max_processed_ssrcs_exceeded_ = true;
+void RtpDemuxer::NotifyObserversOfRsidResolution(const std::string& rsid,
+                                                 uint32_t ssrc) {
+  for (auto* observer : rsid_resolution_observers_) {
+    observer->OnRsidResolved(rsid, ssrc);
   }
 }
 
diff --git a/call/rtp_demuxer.h b/call/rtp_demuxer.h
index 6a4370d..9ec9378 100644
--- a/call/rtp_demuxer.h
+++ b/call/rtp_demuxer.h
@@ -14,9 +14,11 @@
 #include <map>
 #include <set>
 #include <string>
+#include <vector>
 
 namespace webrtc {
 
+class RsidResolutionObserver;
 class RtpPacketReceived;
 class RtpPacketSinkInterface;
 
@@ -41,9 +43,16 @@
   // Null pointer is not allowed.
   bool RemoveSink(const RtpPacketSinkInterface* sink);
 
-  // Returns true if at least one matching sink was found, otherwise false.
+  // Returns true if at least one matching sink was found.
   bool OnRtpPacket(const RtpPacketReceived& packet);
 
+  // Allows other objects to be notified when RSID-SSRC associations are
+  // resolved by this object.
+  void RegisterRsidResolutionObserver(RsidResolutionObserver* observer);
+
+  // Undo a previous RegisterRsidResolutionObserver().
+  void DeregisterRsidResolutionObserver(const RsidResolutionObserver* observer);
+
  private:
   // Records a sink<->SSRC association. This can happen by explicit
   // configuration by AddSink(ssrc...), or by inferred configuration from an
@@ -51,9 +60,14 @@
   // packet reception.
   void RecordSsrcToSinkAssociation(uint32_t ssrc, RtpPacketSinkInterface* sink);
 
-  // When a new packet arrives, we attempt to resolve extra associations,
-  // such as which RSIDs are associated with which SSRCs.
-  void FindSsrcAssociations(const RtpPacketReceived& packet);
+  // When a new packet arrives, we attempt to resolve extra associations.
+  void ResolveAssociations(const RtpPacketReceived& packet);
+
+  // Find the associations of RSID to SSRCs.
+  void ResolveRsidToSsrcAssociations(const RtpPacketReceived& packet);
+
+  // Notify observers of the resolution of an RSID to an SSRC.
+  void NotifyObserversOfRsidResolution(const std::string& rsid, uint32_t ssrc);
 
   // This records the association SSRCs to sinks. Other associations, such
   // as by RSID, also end up here once the RSID, etc., is resolved to an SSRC.
@@ -73,6 +87,10 @@
 
   // Avoid an attack that would create excessive logging.
   bool logged_max_processed_ssrcs_exceeded_ = false;
+
+  // Observers which will be notified when an RSID association to an SSRC is
+  // resolved by this object.
+  std::vector<RsidResolutionObserver*> rsid_resolution_observers_;
 };
 
 }  // namespace webrtc
diff --git a/call/rtp_demuxer_unittest.cc b/call/rtp_demuxer_unittest.cc
index 8ca827c..56bfe3c 100644
--- a/call/rtp_demuxer_unittest.cc
+++ b/call/rtp_demuxer_unittest.cc
@@ -14,9 +14,12 @@
 #include <string>
 
 #include "webrtc/base/arraysize.h"
+#include "webrtc/base/basictypes.h"
 #include "webrtc/base/checks.h"
 #include "webrtc/base/ptr_util.h"
+#include "webrtc/call/rsid_resolution_observer.h"
 #include "webrtc/call/rtp_packet_sink_interface.h"
+#include "webrtc/common_types.h"
 #include "webrtc/modules/rtp_rtcp/include/rtp_header_extension_map.h"
 #include "webrtc/modules/rtp_rtcp/source/rtp_header_extensions.h"
 #include "webrtc/modules/rtp_rtcp/source/rtp_packet_received.h"
@@ -37,6 +40,11 @@
   MOCK_METHOD1(OnRtpPacket, void(const RtpPacketReceived&));
 };
 
+class MockRsidResolutionObserver : public RsidResolutionObserver {
+ public:
+  MOCK_METHOD2(OnRsidResolved, void(const std::string& rsid, uint32_t ssrc));
+};
+
 MATCHER_P(SamePacketAs, other, "") {
   return arg.Ssrc() == other.Ssrc() &&
          arg.SequenceNumber() == other.SequenceNumber();
@@ -119,20 +127,18 @@
 TEST(RtpDemuxerTest, PacketsDeliveredInRightOrder) {
   RtpDemuxer demuxer;
 
-  constexpr uint32_t ssrcs[] = {101, 202, 303};
-  MockRtpPacketSink sinks[arraysize(ssrcs)];
-  for (size_t i = 0; i < arraysize(ssrcs); i++) {
-    demuxer.AddSink(ssrcs[i], &sinks[i]);
-  }
+  constexpr uint32_t ssrc = 101;
+  MockRtpPacketSink sink;
+  demuxer.AddSink(ssrc, &sink);
 
   std::unique_ptr<RtpPacketReceived> packets[5];
   for (size_t i = 0; i < arraysize(packets); i++) {
-    packets[i] = CreateRtpPacketReceived(ssrcs[0], i);
+    packets[i] = CreateRtpPacketReceived(ssrc, i);
   }
 
   InSequence sequence;
   for (const auto& packet : packets) {
-    EXPECT_CALL(sinks[0], OnRtpPacket(SamePacketAs(*packet))).Times(1);
+    EXPECT_CALL(sink, OnRtpPacket(SamePacketAs(*packet))).Times(1);
   }
 
   for (const auto& packet : packets) {
@@ -140,9 +146,7 @@
   }
 
   // Test tear-down
-  for (const auto& sink : sinks) {
-    demuxer.RemoveSink(&sink);
-  }
+  demuxer.RemoveSink(&sink);
 }
 
 TEST(RtpDemuxerTest, MultipleSinksMappedToSameSsrc) {
@@ -424,6 +428,31 @@
   demuxer.RemoveSink(&sink);
 }
 
+TEST(RtpDemuxerTest, RsidUsedByMultipleSinks) {
+  RtpDemuxer demuxer;
+
+  MockRtpPacketSink sinks[3];
+  const std::string shared_rsid = "a";
+
+  for (MockRtpPacketSink& sink : sinks) {
+    demuxer.AddSink(shared_rsid, &sink);
+  }
+
+  constexpr uint32_t shared_ssrc = 888;
+  auto packet = CreateRtpPacketReceivedWithRsid(shared_rsid, shared_ssrc);
+
+  for (auto& sink : sinks) {
+    EXPECT_CALL(sink, OnRtpPacket(SamePacketAs(*packet))).Times(1);
+  }
+
+  EXPECT_TRUE(demuxer.OnRtpPacket(*packet));
+
+  // Test tear-down
+  for (MockRtpPacketSink& sink : sinks) {
+    demuxer.RemoveSink(&sink);
+  }
+}
+
 TEST(RtpDemuxerTest, SinkWithBothRsidAndSsrcAssociations) {
   RtpDemuxer demuxer;
 
@@ -468,6 +497,88 @@
   demuxer.RemoveSink(&sink);
 }
 
+TEST(RtpDemuxerTest, RsidObserversInformedOfResolutions) {
+  RtpDemuxer demuxer;
+
+  constexpr uint32_t ssrc = 111;
+  const std::string rsid = "a";
+
+  MockRsidResolutionObserver rsid_resolution_observers[3];
+  for (auto& observer : rsid_resolution_observers) {
+    demuxer.RegisterRsidResolutionObserver(&observer);
+    EXPECT_CALL(observer, OnRsidResolved(rsid, ssrc)).Times(1);
+  }
+
+  // The expected calls to OnRsidResolved() will be triggered by this.
+  demuxer.OnRtpPacket(*CreateRtpPacketReceivedWithRsid(rsid, ssrc));
+
+  // Test tear-down
+  for (auto& observer : rsid_resolution_observers) {
+    demuxer.DeregisterRsidResolutionObserver(&observer);
+  }
+}
+
+// Normally, we only produce one notification per resolution (though no such
+// guarantee is made), but when a new observer is added, we reset
+// this suppression - we "re-resolve" associations for the benefit of the
+// new observer..
+TEST(RtpDemuxerTest, NotificationSuppressionResetWhenNewObserverAdded) {
+  RtpDemuxer demuxer;
+
+  constexpr uint32_t ssrc = 111;
+  const std::string rsid = "a";
+
+  // First observer registered, then gets a notification.
+  NiceMock<MockRsidResolutionObserver> first_observer;
+  demuxer.RegisterRsidResolutionObserver(&first_observer);
+  demuxer.OnRtpPacket(*CreateRtpPacketReceivedWithRsid(rsid, ssrc));
+
+  // Second observer registered, then gets a notification. No guarantee is made
+  // about whether the first observer would get an additional notification.
+  MockRsidResolutionObserver second_observer;
+  demuxer.RegisterRsidResolutionObserver(&second_observer);
+  EXPECT_CALL(first_observer, OnRsidResolved(rsid, ssrc)).Times(AtLeast(0));
+  EXPECT_CALL(second_observer, OnRsidResolved(rsid, ssrc)).Times(1);
+  demuxer.OnRtpPacket(*CreateRtpPacketReceivedWithRsid(rsid, ssrc));
+
+  // Test tear-down
+  demuxer.DeregisterRsidResolutionObserver(&first_observer);
+  demuxer.DeregisterRsidResolutionObserver(&second_observer);
+}
+
+TEST(RtpDemuxerTest, DeregisteredRsidObserversNotInformedOfResolutions) {
+  RtpDemuxer demuxer;
+
+  constexpr uint32_t ssrc = 111;
+  const std::string rsid = "a";
+  NiceMock<MockRtpPacketSink> sink;
+  demuxer.AddSink(rsid, &sink);
+
+  // Register several, then deregister only one, to show that not all of the
+  // observers had been forgotten when one was removed.
+  MockRsidResolutionObserver observer_1;
+  MockRsidResolutionObserver observer_2_removed;
+  MockRsidResolutionObserver observer_3;
+
+  demuxer.RegisterRsidResolutionObserver(&observer_1);
+  demuxer.RegisterRsidResolutionObserver(&observer_2_removed);
+  demuxer.RegisterRsidResolutionObserver(&observer_3);
+
+  demuxer.DeregisterRsidResolutionObserver(&observer_2_removed);
+
+  EXPECT_CALL(observer_1, OnRsidResolved(rsid, ssrc)).Times(1);
+  EXPECT_CALL(observer_2_removed, OnRsidResolved(_, _)).Times(0);
+  EXPECT_CALL(observer_3, OnRsidResolved(rsid, ssrc)).Times(1);
+
+  // The expected calls to OnRsidResolved() will be triggered by this.
+  demuxer.OnRtpPacket(*CreateRtpPacketReceivedWithRsid(rsid, ssrc));
+
+  // Test tear-down
+  demuxer.RemoveSink(&sink);
+  demuxer.DeregisterRsidResolutionObserver(&observer_1);
+  demuxer.DeregisterRsidResolutionObserver(&observer_3);
+}
+
 #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
 TEST(RtpDemuxerTest, RsidMustBeNonEmpty) {
   RtpDemuxer demuxer;
@@ -493,7 +604,25 @@
   MockRtpPacketSink sink;
   demuxer.AddSink("a", &sink);
   EXPECT_DEATH(demuxer.AddSink("a", &sink), "");
+  demuxer.RemoveSink(&sink);
 }
+
+TEST(RtpDemuxerTest,
+     DoubleRegisterationOfNeverRegisteredRsidResolutionObserverDisallowed) {
+  RtpDemuxer demuxer;
+  MockRsidResolutionObserver observer;
+  demuxer.RegisterRsidResolutionObserver(&observer);
+  EXPECT_DEATH(demuxer.RegisterRsidResolutionObserver(&observer), "");
+  demuxer.DeregisterRsidResolutionObserver(&observer);
+}
+
+TEST(RtpDemuxerTest,
+     DregisterationOfNeverRegisteredRsidResolutionObserverDisallowed) {
+  RtpDemuxer demuxer;
+  MockRsidResolutionObserver observer;
+  EXPECT_DEATH(demuxer.DeregisterRsidResolutionObserver(&observer), "");
+}
+
 #endif
 
 }  // namespace
diff --git a/call/rtp_packet_sink_interface.h b/call/rtp_packet_sink_interface.h
index 900ca35..0b3e64e 100644
--- a/call/rtp_packet_sink_interface.h
+++ b/call/rtp_packet_sink_interface.h
@@ -14,10 +14,10 @@
 
 class RtpPacketReceived;
 
-// This class represents a receiver of an already parsed RTP packets.
+// This class represents a receiver of already parsed RTP packets.
 class RtpPacketSinkInterface {
  public:
-  virtual ~RtpPacketSinkInterface() {}
+  virtual ~RtpPacketSinkInterface() = default;
   virtual void OnRtpPacket(const RtpPacketReceived& packet) = 0;
 };
 
diff --git a/call/rtp_rtcp_demuxer_helper.cc b/call/rtp_rtcp_demuxer_helper.cc
new file mode 100644
index 0000000..e8d3cbf
--- /dev/null
+++ b/call/rtp_rtcp_demuxer_helper.cc
@@ -0,0 +1,55 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "webrtc/call/rtp_rtcp_demuxer_helper.h"
+
+#include "webrtc/modules/rtp_rtcp/source/byte_io.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/bye.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/common_header.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/extended_reports.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/psfb.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/receiver_report.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/rtpfb.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/sender_report.h"
+
+namespace webrtc {
+
+rtc::Optional<uint32_t> ParseRtcpPacketSenderSsrc(
+    rtc::ArrayView<const uint8_t> packet) {
+  rtcp::CommonHeader header;
+  for (const uint8_t* next_packet = packet.begin(); next_packet < packet.end();
+       next_packet = header.NextPacket()) {
+    if (!header.Parse(next_packet, packet.end() - next_packet)) {
+      return rtc::Optional<uint32_t>();
+    }
+
+    switch (header.type()) {
+      case rtcp::Bye::kPacketType:
+      case rtcp::ExtendedReports::kPacketType:
+      case rtcp::Psfb::kPacketType:
+      case rtcp::ReceiverReport::kPacketType:
+      case rtcp::Rtpfb::kPacketType:
+      case rtcp::SenderReport::kPacketType: {
+        // Sender SSRC at the beginning of the RTCP payload.
+        if (header.payload_size_bytes() >= sizeof(uint32_t)) {
+          const uint32_t ssrc_sender =
+              ByteReader<uint32_t>::ReadBigEndian(header.payload());
+          return rtc::Optional<uint32_t>(ssrc_sender);
+        } else {
+          return rtc::Optional<uint32_t>();
+        }
+      }
+    }
+  }
+
+  return rtc::Optional<uint32_t>();
+}
+
+}  // namespace webrtc
diff --git a/call/rtp_rtcp_demuxer_helper.h b/call/rtp_rtcp_demuxer_helper.h
new file mode 100644
index 0000000..19bd603
--- /dev/null
+++ b/call/rtp_rtcp_demuxer_helper.h
@@ -0,0 +1,67 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef WEBRTC_CALL_RTP_RTCP_DEMUXER_HELPER_H_
+#define WEBRTC_CALL_RTP_RTCP_DEMUXER_HELPER_H_
+
+#include <algorithm>
+#include <map>
+#include <utility>
+
+#include "webrtc/base/array_view.h"
+#include "webrtc/base/basictypes.h"
+#include "webrtc/base/optional.h"
+
+namespace webrtc {
+
+template <typename Container>
+bool MultimapAssociationExists(const Container& multimap,
+                               const typename Container::key_type& key,
+                               const typename Container::mapped_type& val) {
+  auto it_range = multimap.equal_range(key);
+  using Reference = typename Container::const_reference;
+  return std::any_of(it_range.first, it_range.second,
+                     [val](Reference elem) { return elem.second == val; });
+}
+
+template <typename Container, typename Value>
+size_t RemoveFromMultimapByValue(Container* multimap, const Value& value) {
+  size_t count = 0;
+  for (auto it = multimap->begin(); it != multimap->end();) {
+    if (it->second == value) {
+      it = multimap->erase(it);
+      ++count;
+    } else {
+      ++it;
+    }
+  }
+  return count;
+}
+
+template <typename Container, typename Key>
+bool ContainerHasKey(const Container& c, const Key& k) {
+  return std::find(c.cbegin(), c.cend(), k) != c.cend();
+}
+
+template <typename Container>
+bool MultimapHasValue(const Container& c,
+                      const typename Container::mapped_type& v) {
+  auto predicate = [v](const typename Container::value_type& it) {
+    return it.second == v;
+  };
+  return std::any_of(c.cbegin(), c.cend(), predicate);
+}
+
+rtc::Optional<uint32_t> ParseRtcpPacketSenderSsrc(
+    rtc::ArrayView<const uint8_t> packet);
+
+}  // namespace webrtc
+
+#endif  // WEBRTC_CALL_RTP_RTCP_DEMUXER_HELPER_H_
diff --git a/call/rtp_rtcp_demuxer_helper_unittest.cc b/call/rtp_rtcp_demuxer_helper_unittest.cc
new file mode 100644
index 0000000..e51002f
--- /dev/null
+++ b/call/rtp_rtcp_demuxer_helper_unittest.cc
@@ -0,0 +1,119 @@
+/*
+ *  Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include <cstdio>
+
+#include "webrtc/call/rtp_rtcp_demuxer_helper.h"
+
+#include "webrtc/base/arraysize.h"
+#include "webrtc/base/basictypes.h"
+#include "webrtc/base/buffer.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/bye.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/extended_jitter_report.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/extended_reports.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/pli.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/rapid_resync_request.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/receiver_report.h"
+#include "webrtc/modules/rtp_rtcp/source/rtcp_packet/sender_report.h"
+#include "webrtc/test/gtest.h"
+
+namespace webrtc {
+
+namespace {
+constexpr uint32_t kSsrc = 8374;
+}  // namespace
+
+TEST(RtpRtcpDemuxerHelperTest, ParseRtcpPacketSenderSsrc_ByePacket) {
+  webrtc::rtcp::Bye rtcp_packet;
+  rtcp_packet.SetSenderSsrc(kSsrc);
+  rtc::Buffer raw_packet = rtcp_packet.Build();
+
+  rtc::Optional<uint32_t> ssrc = ParseRtcpPacketSenderSsrc(raw_packet);
+  EXPECT_EQ(ssrc, kSsrc);
+}
+
+TEST(RtpRtcpDemuxerHelperTest,
+     ParseRtcpPacketSenderSsrc_ExtendedReportsPacket) {
+  webrtc::rtcp::ExtendedReports rtcp_packet;
+  rtcp_packet.SetSenderSsrc(kSsrc);
+  rtc::Buffer raw_packet = rtcp_packet.Build();
+
+  rtc::Optional<uint32_t> ssrc = ParseRtcpPacketSenderSsrc(raw_packet);
+  EXPECT_EQ(ssrc, kSsrc);
+}
+
+TEST(RtpRtcpDemuxerHelperTest, ParseRtcpPacketSenderSsrc_PsfbPacket) {
+  webrtc::rtcp::Pli rtcp_packet;  // Psfb is abstract; use a subclass.
+  rtcp_packet.SetSenderSsrc(kSsrc);
+  rtc::Buffer raw_packet = rtcp_packet.Build();
+
+  rtc::Optional<uint32_t> ssrc = ParseRtcpPacketSenderSsrc(raw_packet);
+  EXPECT_EQ(ssrc, kSsrc);
+}
+
+TEST(RtpRtcpDemuxerHelperTest, ParseRtcpPacketSenderSsrc_ReceiverReportPacket) {
+  webrtc::rtcp::ReceiverReport rtcp_packet;
+  rtcp_packet.SetSenderSsrc(kSsrc);
+  rtc::Buffer raw_packet = rtcp_packet.Build();
+
+  rtc::Optional<uint32_t> ssrc = ParseRtcpPacketSenderSsrc(raw_packet);
+  EXPECT_EQ(ssrc, kSsrc);
+}
+
+TEST(RtpRtcpDemuxerHelperTest, ParseRtcpPacketSenderSsrc_RtpfbPacket) {
+  // Rtpfb is abstract; use a subclass.
+  webrtc::rtcp::RapidResyncRequest rtcp_packet;
+  rtcp_packet.SetSenderSsrc(kSsrc);
+  rtc::Buffer raw_packet = rtcp_packet.Build();
+
+  rtc::Optional<uint32_t> ssrc = ParseRtcpPacketSenderSsrc(raw_packet);
+  EXPECT_EQ(ssrc, kSsrc);
+}
+
+TEST(RtpRtcpDemuxerHelperTest, ParseRtcpPacketSenderSsrc_SenderReportPacket) {
+  webrtc::rtcp::SenderReport rtcp_packet;
+  rtcp_packet.SetSenderSsrc(kSsrc);
+  rtc::Buffer raw_packet = rtcp_packet.Build();
+
+  rtc::Optional<uint32_t> ssrc = ParseRtcpPacketSenderSsrc(raw_packet);
+  EXPECT_EQ(ssrc, kSsrc);
+}
+
+TEST(RtpRtcpDemuxerHelperTest, ParseRtcpPacketSenderSsrc_MalformedRtcpPacket) {
+  uint8_t garbage[100];
+  memset(&garbage[0], 0, arraysize(garbage));
+
+  rtc::Optional<uint32_t> ssrc = ParseRtcpPacketSenderSsrc(garbage);
+  EXPECT_FALSE(ssrc);
+}
+
+TEST(RtpRtcpDemuxerHelperTest,
+     ParseRtcpPacketSenderSsrc_RtcpMessageWithoutSenderSsrc) {
+  webrtc::rtcp::ExtendedJitterReport rtcp_packet;  // Has no sender SSRC.
+  rtc::Buffer raw_packet = rtcp_packet.Build();
+
+  rtc::Optional<uint32_t> ssrc = ParseRtcpPacketSenderSsrc(raw_packet);
+  EXPECT_FALSE(ssrc);
+}
+
+TEST(RtpRtcpDemuxerHelperTest, ParseRtcpPacketSenderSsrc_TruncatedRtcpMessage) {
+  webrtc::rtcp::Bye rtcp_packet;
+  rtcp_packet.SetSenderSsrc(kSsrc);
+  rtc::Buffer raw_packet = rtcp_packet.Build();
+
+  constexpr size_t rtcp_length_bytes = 8;
+  ASSERT_EQ(rtcp_length_bytes, raw_packet.size());
+
+  rtc::Optional<uint32_t> ssrc = ParseRtcpPacketSenderSsrc(
+      rtc::ArrayView<const uint8_t>(raw_packet.data(), rtcp_length_bytes - 1));
+  EXPECT_FALSE(ssrc);
+}
+
+}  // namespace webrtc