Reland "Reland "Extracts ssrc based feedback tracking from feedback adapter.""

This reverts commit d2d7a47247187236ce62e3c842963f6e4e9f0f1f.

Reason for revert: This revert is not needed. Failure was not due to webrtc.

Original change's description:
> Revert "Reland "Extracts ssrc based feedback tracking from feedback adapter.""
> 
> This reverts commit d61338fa6ed957dd992f25da4844db34b14f89c7.
> 
> Reason for revert: Causing a build break:
> webrtc/call/BUILD:300:1: Undeclared inclusion(s) in rule 'webrtc/call:rtp_sender':
> this rule is missing dependency declarations for the following files included by 'call/rtp_transport_controller_send.cc':
>   'webrtc/modules/congestion_controller/rtp/transport_feedback_demuxer.h'
> 
> 
> 
> Original change's description:
> > Reland "Extracts ssrc based feedback tracking from feedback adapter."
> > 
> > This is a reland of 08c46adc1e9f9a8d74357fe132a68906ae6e6974
> > 
> > Original change's description:
> > > Extracts ssrc based feedback tracking from feedback adapter.
> > > 
> > > This prepares for moving TransportFeedbackAdapter to TaskQueue.
> > > 
> > > Bug: webrtc:9883
> > > Change-Id: Ib333f6a6837ff6dd8b10813e8953e4d8cd5d8633
> > > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/162040
> > > Reviewed-by: Erik Språng <sprang@webrtc.org>
> > > Commit-Queue: Sebastian Jansson <srte@webrtc.org>
> > > Cr-Commit-Position: refs/heads/master@{#30076}
> > 
> > Bug: webrtc:9883
> > Change-Id: Ia74a3b1fba4d83eece9b0eb6475d6e6aecb65700
> > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/162201
> > Reviewed-by: Erik Språng <sprang@webrtc.org>
> > Commit-Queue: Sebastian Jansson <srte@webrtc.org>
> > Cr-Commit-Position: refs/heads/master@{#30266}
> 
> TBR=sprang@webrtc.org,srte@webrtc.org
> 
> Change-Id: I7f3f872c7ff863a37ad8dca08051fe1e04671bfb
> No-Presubmit: true
> No-Tree-Checks: true
> No-Try: true
> Bug: webrtc:9883
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/166182
> Reviewed-by: JT Teh <jtteh@webrtc.org>
> Commit-Queue: JT Teh <jtteh@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#30270}

TBR=sprang@webrtc.org,srte@webrtc.org,jtteh@webrtc.org

Change-Id: Idd1073ebfef77b2154d7123b47dacb479537c550
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: webrtc:9883
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/166200
Reviewed-by: JT Teh <jtteh@webrtc.org>
Commit-Queue: JT Teh <jtteh@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30271}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index a5878ab..76dbc45 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -228,7 +228,7 @@
 }
 StreamFeedbackProvider*
 RtpTransportControllerSend::GetStreamFeedbackProvider() {
-  return &transport_feedback_adapter_;
+  return &feedback_demuxer_;
 }
 
 void RtpTransportControllerSend::RegisterTargetTransferRateObserver(
@@ -468,6 +468,8 @@
 
 void RtpTransportControllerSend::OnAddPacket(
     const RtpPacketSendInfo& packet_info) {
+  feedback_demuxer_.AddPacket(packet_info);
+
   transport_feedback_adapter_.AddPacket(
       packet_info,
       send_side_bwe_with_overhead_ ? transport_overhead_bytes_per_packet_.load()
@@ -478,6 +480,7 @@
 void RtpTransportControllerSend::OnTransportFeedback(
     const rtcp::TransportFeedback& feedback) {
   RTC_DCHECK_RUNS_SERIALIZED(&worker_race_);
+  feedback_demuxer_.OnTransportFeedback(feedback);
 
   absl::optional<TransportPacketsFeedback> feedback_msg =
       transport_feedback_adapter_.ProcessTransportFeedback(
diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h
index b5a53d7..4e9ff4d 100644
--- a/call/rtp_transport_controller_send.h
+++ b/call/rtp_transport_controller_send.h
@@ -24,6 +24,7 @@
 #include "call/rtp_video_sender.h"
 #include "modules/congestion_controller/rtp/control_handler.h"
 #include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
+#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h"
 #include "modules/pacing/paced_sender.h"
 #include "modules/pacing/packet_router.h"
 #include "modules/pacing/rtp_packet_pacer.h"
@@ -149,6 +150,7 @@
   std::unique_ptr<TaskQueuePacedSender> task_queue_pacer_;
 
   TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);
+  TransportFeedbackDemuxer feedback_demuxer_;
 
   // TODO(srte): Move all access to feedback adapter to task queue.
   TransportFeedbackAdapter transport_feedback_adapter_;
diff --git a/modules/congestion_controller/rtp/BUILD.gn b/modules/congestion_controller/rtp/BUILD.gn
index 36a9b25..38a4bf1 100644
--- a/modules/congestion_controller/rtp/BUILD.gn
+++ b/modules/congestion_controller/rtp/BUILD.gn
@@ -45,6 +45,8 @@
   sources = [
     "transport_feedback_adapter.cc",
     "transport_feedback_adapter.h",
+    "transport_feedback_demuxer.cc",
+    "transport_feedback_demuxer.h",
   ]
 
   deps = [
@@ -69,6 +71,7 @@
 
     sources = [
       "transport_feedback_adapter_unittest.cc",
+      "transport_feedback_demuxer_unittest.cc",
     ]
     deps = [
       ":transport_feedback",
diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.cc b/modules/congestion_controller/rtp/transport_feedback_adapter.cc
index 877ee8e..efb88d2 100644
--- a/modules/congestion_controller/rtp/transport_feedback_adapter.cc
+++ b/modules/congestion_controller/rtp/transport_feedback_adapter.cc
@@ -66,30 +66,6 @@
 
 TransportFeedbackAdapter::TransportFeedbackAdapter() = default;
 
-TransportFeedbackAdapter::~TransportFeedbackAdapter() {
-  RTC_DCHECK(observers_.empty());
-}
-
-void TransportFeedbackAdapter::RegisterStreamFeedbackObserver(
-    std::vector<uint32_t> ssrcs,
-    StreamFeedbackObserver* observer) {
-  rtc::CritScope cs(&observers_lock_);
-  RTC_DCHECK(observer);
-  RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) {
-               return pair.second == observer;
-             }) == observers_.end());
-  observers_.push_back({ssrcs, observer});
-}
-
-void TransportFeedbackAdapter::DeRegisterStreamFeedbackObserver(
-    StreamFeedbackObserver* observer) {
-  rtc::CritScope cs(&observers_lock_);
-  RTC_DCHECK(observer);
-  const auto it = absl::c_find_if(
-      observers_, [=](const auto& pair) { return pair.second == observer; });
-  RTC_DCHECK(it != observers_.end());
-  observers_.erase(it);
-}
 
 void TransportFeedbackAdapter::AddPacket(const RtpPacketSendInfo& packet_info,
                                          size_t overhead_bytes,
@@ -104,10 +80,6 @@
     packet.local_net_id = local_net_id_;
     packet.remote_net_id = remote_net_id_;
     packet.sent.pacing_info = packet_info.pacing_info;
-    if (packet_info.has_rtp_sequence_number) {
-      packet.ssrc = packet_info.ssrc;
-      packet.rtp_sequence_number = packet_info.rtp_sequence_number;
-    }
 
     while (!history_.empty() &&
            creation_time - history_.begin()->second.creation_time >
@@ -168,32 +140,25 @@
     RTC_LOG(LS_INFO) << "Empty transport feedback packet received.";
     return absl::nullopt;
   }
-  std::vector<PacketFeedback> feedback_vector;
+
+  rtc::CritScope cs(&lock_);
   TransportPacketsFeedback msg;
   msg.feedback_time = feedback_receive_time;
-  {
-    rtc::CritScope cs(&lock_);
-    msg.prior_in_flight =
-        in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
-    feedback_vector =
-        ProcessTransportFeedbackInner(feedback, feedback_receive_time);
-    if (feedback_vector.empty())
-      return absl::nullopt;
 
-    for (const PacketFeedback& fb : feedback_vector) {
-      PacketResult res;
-      res.sent_packet = fb.sent;
-      res.receive_time = fb.receive_time;
-      msg.packet_feedbacks.push_back(res);
-    }
-    auto it = history_.find(last_ack_seq_num_);
-    if (it != history_.end()) {
-      msg.first_unacked_send_time = it->second.sent.send_time;
-    }
-    msg.data_in_flight =
-        in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
+  msg.prior_in_flight =
+      in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
+  msg.packet_feedbacks =
+      ProcessTransportFeedbackInner(feedback, feedback_receive_time);
+  if (msg.packet_feedbacks.empty())
+    return absl::nullopt;
+
+  auto it = history_.find(last_ack_seq_num_);
+  if (it != history_.end()) {
+    msg.first_unacked_send_time = it->second.sent.send_time;
   }
-  SignalObservers(feedback_vector);
+  msg.data_in_flight =
+      in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
+
   return msg;
 }
 
@@ -209,7 +174,7 @@
   return in_flight_.GetOutstandingData(local_net_id_, remote_net_id_);
 }
 
-std::vector<PacketFeedback>
+std::vector<PacketResult>
 TransportFeedbackAdapter::ProcessTransportFeedbackInner(
     const rtcp::TransportFeedback& feedback,
     Timestamp feedback_time) {
@@ -232,8 +197,8 @@
   }
   last_timestamp_ = feedback.GetBaseTime();
 
-  std::vector<PacketFeedback> packet_feedback_vector;
-  packet_feedback_vector.reserve(feedback.GetPacketStatusCount());
+  std::vector<PacketResult> packet_result_vector;
+  packet_result_vector.reserve(feedback.GetPacketStatusCount());
 
   size_t failed_lookups = 0;
   size_t ignored = 0;
@@ -276,7 +241,10 @@
     }
     if (packet_feedback.local_net_id == local_net_id_ &&
         packet_feedback.remote_net_id == remote_net_id_) {
-      packet_feedback_vector.push_back(packet_feedback);
+      PacketResult result;
+      result.sent_packet = packet_feedback.sent;
+      result.receive_time = packet_feedback.receive_time;
+      packet_result_vector.push_back(result);
     } else {
       ++ignored;
     }
@@ -292,27 +260,7 @@
                      << " packets because they were sent on a different route.";
   }
 
-  return packet_feedback_vector;
-}
-
-void TransportFeedbackAdapter::SignalObservers(
-    const std::vector<PacketFeedback>& feedback_vector) {
-  rtc::CritScope cs(&observers_lock_);
-  for (auto& observer : observers_) {
-    std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
-    for (const auto& packet : feedback_vector) {
-      if (packet.ssrc && absl::c_count(observer.first, *packet.ssrc) > 0) {
-        StreamFeedbackObserver::StreamPacketInfo packet_info;
-        packet_info.ssrc = *packet.ssrc;
-        packet_info.rtp_sequence_number = packet.rtp_sequence_number;
-        packet_info.received = packet.receive_time.IsFinite();
-        selected_feedback.push_back(packet_info);
-      }
-    }
-    if (!selected_feedback.empty()) {
-      observer.second->OnPacketFeedbackVector(std::move(selected_feedback));
-    }
-  }
+  return packet_result_vector;
 }
 
 }  // namespace webrtc
diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter.h b/modules/congestion_controller/rtp/transport_feedback_adapter.h
index 699c6ed..b6bed96 100644
--- a/modules/congestion_controller/rtp/transport_feedback_adapter.h
+++ b/modules/congestion_controller/rtp/transport_feedback_adapter.h
@@ -38,9 +38,6 @@
   // The network route ids that this packet is associated with.
   uint16_t local_net_id = 0;
   uint16_t remote_net_id = 0;
-  // The SSRC and RTP sequence number of the packet this feedback refers to.
-  absl::optional<uint32_t> ssrc;
-  uint16_t rtp_sequence_number = 0;
 };
 
 class InFlightBytesTracker {
@@ -55,16 +52,9 @@
   std::map<RemoteAndLocalNetworkId, DataSize> in_flight_data_;
 };
 
-class TransportFeedbackAdapter : public StreamFeedbackProvider {
+class TransportFeedbackAdapter {
  public:
   TransportFeedbackAdapter();
-  virtual ~TransportFeedbackAdapter();
-
-  void RegisterStreamFeedbackObserver(
-      std::vector<uint32_t> ssrcs,
-      StreamFeedbackObserver* observer) override;
-  void DeRegisterStreamFeedbackObserver(
-      StreamFeedbackObserver* observer) override;
 
   void AddPacket(const RtpPacketSendInfo& packet_info,
                  size_t overhead_bytes,
@@ -83,15 +73,10 @@
  private:
   enum class SendTimeHistoryStatus { kNotAdded, kOk, kDuplicate };
 
-  void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
-
-  std::vector<PacketFeedback> ProcessTransportFeedbackInner(
+  std::vector<PacketResult> ProcessTransportFeedbackInner(
       const rtcp::TransportFeedback& feedback,
       Timestamp feedback_time) RTC_RUN_ON(&lock_);
 
-  void SignalObservers(
-      const std::vector<PacketFeedback>& packet_feedback_vector);
-
   rtc::CriticalSection lock_;
   DataSize pending_untracked_size_ RTC_GUARDED_BY(&lock_) = DataSize::Zero();
   Timestamp last_send_time_ RTC_GUARDED_BY(&lock_) = Timestamp::MinusInfinity();
@@ -110,13 +95,6 @@
 
   uint16_t local_net_id_ RTC_GUARDED_BY(&lock_) = 0;
   uint16_t remote_net_id_ RTC_GUARDED_BY(&lock_) = 0;
-
-  rtc::CriticalSection observers_lock_;
-  // Maps a set of ssrcs to corresponding observer. Vectors are used rather than
-  // set/map to ensure that the processing order is consistent independently of
-  // the randomized ssrcs.
-  std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>>
-      observers_ RTC_GUARDED_BY(&observers_lock_);
 };
 
 }  // namespace webrtc
diff --git a/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc b/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc
index 32e5f0a..4631dc2 100644
--- a/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc
+++ b/modules/congestion_controller/rtp/transport_feedback_adapter_unittest.cc
@@ -126,58 +126,6 @@
   std::unique_ptr<TransportFeedbackAdapter> adapter_;
 };
 
-TEST_F(TransportFeedbackAdapterTest, ObserverSanity) {
-  MockStreamFeedbackObserver mock;
-  adapter_->RegisterStreamFeedbackObserver({kSsrc}, &mock);
-
-  const std::vector<PacketResult> packets = {
-      CreatePacket(100, 200, 0, 1000, kPacingInfo0),
-      CreatePacket(110, 210, 1, 2000, kPacingInfo0),
-      CreatePacket(120, 220, 2, 3000, kPacingInfo0)};
-
-  rtcp::TransportFeedback feedback;
-  feedback.SetBase(packets[0].sent_packet.sequence_number,
-                   packets[0].receive_time.us());
-
-  for (const auto& packet : packets) {
-    OnSentPacket(packet);
-    EXPECT_TRUE(feedback.AddReceivedPacket(packet.sent_packet.sequence_number,
-                                           packet.receive_time.us()));
-  }
-
-  EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(1);
-  adapter_->ProcessTransportFeedback(feedback, clock_.CurrentTime());
-
-  adapter_->DeRegisterStreamFeedbackObserver(&mock);
-
-  auto new_packet = CreatePacket(130, 230, 3, 4000, kPacingInfo0);
-  OnSentPacket(new_packet);
-
-  rtcp::TransportFeedback second_feedback;
-  second_feedback.SetBase(new_packet.sent_packet.sequence_number,
-                          new_packet.receive_time.us());
-  EXPECT_TRUE(second_feedback.AddReceivedPacket(
-      new_packet.sent_packet.sequence_number, new_packet.receive_time.us()));
-  EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0);
-  adapter_->ProcessTransportFeedback(second_feedback, clock_.CurrentTime());
-}
-
-#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
-TEST_F(TransportFeedbackAdapterTest, ObserverDoubleRegistrationDeathTest) {
-  MockStreamFeedbackObserver mock;
-  adapter_->RegisterStreamFeedbackObserver({0}, &mock);
-  EXPECT_DEATH(adapter_->RegisterStreamFeedbackObserver({0}, &mock), "");
-  adapter_->DeRegisterStreamFeedbackObserver(&mock);
-}
-
-TEST_F(TransportFeedbackAdapterTest, ObserverMissingDeRegistrationDeathTest) {
-  MockStreamFeedbackObserver mock;
-  adapter_->RegisterStreamFeedbackObserver({0}, &mock);
-  EXPECT_DEATH(adapter_.reset(), "");
-  adapter_->DeRegisterStreamFeedbackObserver(&mock);
-}
-#endif
-
 TEST_F(TransportFeedbackAdapterTest, AdaptsFeedbackAndPopulatesSendTimes) {
   std::vector<PacketResult> packets;
   packets.push_back(CreatePacket(100, 200, 0, 1500, kPacingInfo0));
diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
new file mode 100644
index 0000000..c7893d7
--- /dev/null
+++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.cc
@@ -0,0 +1,88 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h"
+#include "absl/algorithm/container.h"
+#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
+
+namespace webrtc {
+namespace {
+static const size_t kMaxPacketsInHistory = 5000;
+}
+void TransportFeedbackDemuxer::RegisterStreamFeedbackObserver(
+    std::vector<uint32_t> ssrcs,
+    StreamFeedbackObserver* observer) {
+  rtc::CritScope cs(&observers_lock_);
+  RTC_DCHECK(observer);
+  RTC_DCHECK(absl::c_find_if(observers_, [=](const auto& pair) {
+               return pair.second == observer;
+             }) == observers_.end());
+  observers_.push_back({ssrcs, observer});
+}
+
+void TransportFeedbackDemuxer::DeRegisterStreamFeedbackObserver(
+    StreamFeedbackObserver* observer) {
+  rtc::CritScope cs(&observers_lock_);
+  RTC_DCHECK(observer);
+  const auto it = absl::c_find_if(
+      observers_, [=](const auto& pair) { return pair.second == observer; });
+  RTC_DCHECK(it != observers_.end());
+  observers_.erase(it);
+}
+
+void TransportFeedbackDemuxer::AddPacket(const RtpPacketSendInfo& packet_info) {
+  rtc::CritScope cs(&lock_);
+  if (packet_info.has_rtp_sequence_number && packet_info.ssrc != 0) {
+    StreamFeedbackObserver::StreamPacketInfo info;
+    info.ssrc = packet_info.ssrc;
+    info.rtp_sequence_number = packet_info.rtp_sequence_number;
+    info.received = false;
+    history_.insert(
+        {seq_num_unwrapper_.Unwrap(packet_info.transport_sequence_number),
+         info});
+  }
+  while (history_.size() > kMaxPacketsInHistory) {
+    history_.erase(history_.begin());
+  }
+}
+
+void TransportFeedbackDemuxer::OnTransportFeedback(
+    const rtcp::TransportFeedback& feedback) {
+  std::vector<StreamFeedbackObserver::StreamPacketInfo> stream_feedbacks;
+  {
+    rtc::CritScope cs(&lock_);
+    for (const auto& packet : feedback.GetAllPackets()) {
+      int64_t seq_num =
+          seq_num_unwrapper_.UnwrapWithoutUpdate(packet.sequence_number());
+      auto it = history_.find(seq_num);
+      if (it != history_.end()) {
+        auto packet_info = it->second;
+        packet_info.received = packet.received();
+        stream_feedbacks.push_back(packet_info);
+        if (packet.received())
+          history_.erase(it);
+      }
+    }
+  }
+
+  rtc::CritScope cs(&observers_lock_);
+  for (auto& observer : observers_) {
+    std::vector<StreamFeedbackObserver::StreamPacketInfo> selected_feedback;
+    for (const auto& packet_info : stream_feedbacks) {
+      if (absl::c_count(observer.first, packet_info.ssrc) > 0) {
+        selected_feedback.push_back(packet_info);
+      }
+    }
+    if (!selected_feedback.empty()) {
+      observer.second->OnPacketFeedbackVector(std::move(selected_feedback));
+    }
+  }
+}
+
+}  // namespace webrtc
diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer.h b/modules/congestion_controller/rtp/transport_feedback_demuxer.h
new file mode 100644
index 0000000..bcd25d5
--- /dev/null
+++ b/modules/congestion_controller/rtp/transport_feedback_demuxer.h
@@ -0,0 +1,49 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_
+#define MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_
+
+#include <map>
+#include <utility>
+#include <vector>
+
+#include "modules/include/module_common_types_public.h"
+#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
+#include "rtc_base/critical_section.h"
+
+namespace webrtc {
+
+class TransportFeedbackDemuxer : public StreamFeedbackProvider {
+ public:
+  // Implements StreamFeedbackProvider interface
+  void RegisterStreamFeedbackObserver(
+      std::vector<uint32_t> ssrcs,
+      StreamFeedbackObserver* observer) override;
+  void DeRegisterStreamFeedbackObserver(
+      StreamFeedbackObserver* observer) override;
+  void AddPacket(const RtpPacketSendInfo& packet_info);
+  void OnTransportFeedback(const rtcp::TransportFeedback& feedback);
+
+ private:
+  rtc::CriticalSection lock_;
+  SequenceNumberUnwrapper seq_num_unwrapper_ RTC_GUARDED_BY(&lock_);
+  std::map<int64_t, StreamFeedbackObserver::StreamPacketInfo> history_
+      RTC_GUARDED_BY(&lock_);
+
+  // Maps a set of ssrcs to corresponding observer. Vectors are used rather than
+  // set/map to ensure that the processing order is consistent independently of
+  // the randomized ssrcs.
+  rtc::CriticalSection observers_lock_;
+  std::vector<std::pair<std::vector<uint32_t>, StreamFeedbackObserver*>>
+      observers_ RTC_GUARDED_BY(&observers_lock_);
+};
+}  // namespace webrtc
+
+#endif  // MODULES_CONGESTION_CONTROLLER_RTP_TRANSPORT_FEEDBACK_DEMUXER_H_
diff --git a/modules/congestion_controller/rtp/transport_feedback_demuxer_unittest.cc b/modules/congestion_controller/rtp/transport_feedback_demuxer_unittest.cc
new file mode 100644
index 0000000..144e3e1
--- /dev/null
+++ b/modules/congestion_controller/rtp/transport_feedback_demuxer_unittest.cc
@@ -0,0 +1,67 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "modules/congestion_controller/rtp/transport_feedback_demuxer.h"
+
+#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+
+namespace webrtc {
+namespace {
+
+using ::testing::_;
+static constexpr uint32_t kSsrc = 8492;
+
+class MockStreamFeedbackObserver : public webrtc::StreamFeedbackObserver {
+ public:
+  MOCK_METHOD1(OnPacketFeedbackVector,
+               void(std::vector<StreamPacketInfo> packet_feedback_vector));
+};
+
+RtpPacketSendInfo CreatePacket(uint32_t ssrc,
+                               int16_t rtp_sequence_number,
+                               int64_t transport_sequence_number) {
+  RtpPacketSendInfo res;
+  res.ssrc = ssrc;
+  res.transport_sequence_number = transport_sequence_number;
+  res.rtp_sequence_number = rtp_sequence_number;
+  res.has_rtp_sequence_number = true;
+  return res;
+}
+}  // namespace
+TEST(TransportFeedbackDemuxerTest, ObserverSanity) {
+  TransportFeedbackDemuxer demuxer;
+  MockStreamFeedbackObserver mock;
+  demuxer.RegisterStreamFeedbackObserver({kSsrc}, &mock);
+
+  demuxer.AddPacket(CreatePacket(kSsrc, 55, 1));
+  demuxer.AddPacket(CreatePacket(kSsrc, 56, 2));
+  demuxer.AddPacket(CreatePacket(kSsrc, 57, 3));
+
+  rtcp::TransportFeedback feedback;
+  feedback.SetBase(1, 1000);
+  ASSERT_TRUE(feedback.AddReceivedPacket(1, 1000));
+  ASSERT_TRUE(feedback.AddReceivedPacket(2, 2000));
+  ASSERT_TRUE(feedback.AddReceivedPacket(3, 3000));
+
+  EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(1);
+  demuxer.OnTransportFeedback(feedback);
+
+  demuxer.DeRegisterStreamFeedbackObserver(&mock);
+
+  demuxer.AddPacket(CreatePacket(kSsrc, 58, 4));
+  rtcp::TransportFeedback second_feedback;
+  second_feedback.SetBase(4, 4000);
+  ASSERT_TRUE(second_feedback.AddReceivedPacket(4, 4000));
+
+  EXPECT_CALL(mock, OnPacketFeedbackVector(_)).Times(0);
+  demuxer.OnTransportFeedback(second_feedback);
+}
+}  // namespace webrtc