Prepare packet router for flushing mechanism.

This CL adds the ability to forward aborted retransmission notifications
to specified RTP modules, as well as a way to find the RTX ssrc
associated with a media SSRC.
These will both be used by upcoming logic that can selectively flush
given streams from the pacer queue.

Bug: webrtc:11340
Change-Id: Ief3be47e4fd7dc5a1499bc21890e8979400ecb44
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/274706
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38050}
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index 9899fcd..988fe50 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -52,6 +52,15 @@
     virtual std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() = 0;
     virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
         DataSize size) = 0;
+
+    // TODO(bugs.webrtc.org/11340): Make pure virtual once downstream projects
+    // have been updated.
+    virtual void OnAbortedRetransmissions(
+        uint32_t ssrc,
+        rtc::ArrayView<const uint16_t> sequence_numbers) {}
+    virtual absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t ssrc) const {
+      return absl::nullopt;
+    }
   };
 
   // Expected max pacer delay. If ExpectedQueueTime() is higher than
diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc
index db6a22b..37b8605 100644
--- a/modules/pacing/pacing_controller_unittest.cc
+++ b/modules/pacing/pacing_controller_unittest.cc
@@ -128,6 +128,14 @@
               (),
               (override));
   MOCK_METHOD(size_t, SendPadding, (size_t target_size));
+  MOCK_METHOD(void,
+              OnAbortedRetransmissions,
+              (uint32_t, rtc::ArrayView<const uint16_t>),
+              (override));
+  MOCK_METHOD(absl::optional<uint32_t>,
+              GetRtxSsrcForMedia,
+              (uint32_t),
+              (const, override));
 };
 
 // Mock callback implementing the raw api.
@@ -147,6 +155,14 @@
               GeneratePadding,
               (DataSize target_size),
               (override));
+  MOCK_METHOD(void,
+              OnAbortedRetransmissions,
+              (uint32_t, rtc::ArrayView<const uint16_t>),
+              (override));
+  MOCK_METHOD(absl::optional<uint32_t>,
+              GetRtxSsrcForMedia,
+              (uint32_t),
+              (const, override));
 };
 
 class PacingControllerPadding : public PacingController::PacketSender {
@@ -178,6 +194,12 @@
     return packets;
   }
 
+  void OnAbortedRetransmissions(uint32_t,
+                                rtc::ArrayView<const uint16_t>) override {}
+  absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t) const override {
+    return absl::nullopt;
+  }
+
   size_t padding_sent() { return padding_sent_; }
   size_t total_bytes_sent() { return total_bytes_sent_; }
 
@@ -220,6 +242,12 @@
     return packets;
   }
 
+  void OnAbortedRetransmissions(uint32_t,
+                                rtc::ArrayView<const uint16_t>) override {}
+  absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t) const override {
+    return absl::nullopt;
+  }
+
   int packets_sent() const { return packets_sent_; }
   int padding_sent() const { return padding_sent_; }
   int total_packets_sent() const { return packets_sent_ + padding_sent_; }
diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc
index a09f191..b28d977 100644
--- a/modules/pacing/packet_router.cc
+++ b/modules/pacing/packet_router.cc
@@ -86,10 +86,10 @@
 }
 
 void PacketRouter::RemoveSendRtpModuleFromMap(uint32_t ssrc) {
-  auto kv = send_modules_map_.find(ssrc);
-  RTC_DCHECK(kv != send_modules_map_.end());
-  send_modules_list_.remove(kv->second);
-  send_modules_map_.erase(kv);
+  auto it = send_modules_map_.find(ssrc);
+  RTC_DCHECK(it != send_modules_map_.end());
+  send_modules_list_.remove(it->second);
+  send_modules_map_.erase(it);
 }
 
 void PacketRouter::RemoveSendRtpModule(RtpRtcpInterface* rtp_module) {
@@ -151,8 +151,8 @@
   }
 
   uint32_t ssrc = packet->Ssrc();
-  auto kv = send_modules_map_.find(ssrc);
-  if (kv == send_modules_map_.end()) {
+  auto it = send_modules_map_.find(ssrc);
+  if (it == send_modules_map_.end()) {
     RTC_LOG(LS_WARNING)
         << "Failed to send packet, matching RTP module not found "
            "or transport error. SSRC = "
@@ -160,7 +160,7 @@
     return;
   }
 
-  RtpRtcpInterface* rtp_module = kv->second;
+  RtpRtcpInterface* rtp_module = it->second;
   if (!rtp_module->TrySendPacket(packet.get(), cluster_info)) {
     RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module.";
     return;
@@ -235,6 +235,27 @@
   return padding_packets;
 }
 
+void PacketRouter::OnAbortedRetransmissions(
+    uint32_t ssrc,
+    rtc::ArrayView<const uint16_t> sequence_numbers) {
+  MutexLock lock(&modules_mutex_);
+  auto it = send_modules_map_.find(ssrc);
+  if (it != send_modules_map_.end()) {
+    it->second->OnAbortedRetransmissions(sequence_numbers);
+  }
+}
+
+absl::optional<uint32_t> PacketRouter::GetRtxSsrcForMedia(uint32_t ssrc) const {
+  MutexLock lock(&modules_mutex_);
+  auto it = send_modules_map_.find(ssrc);
+  if (it != send_modules_map_.end() && it->second->SSRC() == ssrc) {
+    // A module is registered with the given SSRC, and that SSRC is the main
+    // media SSRC for that RTP module.
+    return it->second->RtxSsrc();
+  }
+  return absl::nullopt;
+}
+
 uint16_t PacketRouter::CurrentTransportSequenceNumber() const {
   MutexLock lock(&modules_mutex_);
   return transport_seq_ & 0xFFFF;
diff --git a/modules/pacing/packet_router.h b/modules/pacing/packet_router.h
index 11d8979..68b82c6 100644
--- a/modules/pacing/packet_router.h
+++ b/modules/pacing/packet_router.h
@@ -58,6 +58,10 @@
   std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() override;
   std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
       DataSize size) override;
+  void OnAbortedRetransmissions(
+      uint32_t ssrc,
+      rtc::ArrayView<const uint16_t> sequence_numbers) override;
+  absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t ssrc) const override;
 
   uint16_t CurrentTransportSequenceNumber() const;
 
diff --git a/modules/pacing/packet_router_unittest.cc b/modules/pacing/packet_router_unittest.cc
index fc26922..65b2ad2 100644
--- a/modules/pacing/packet_router_unittest.cc
+++ b/modules/pacing/packet_router_unittest.cc
@@ -36,6 +36,7 @@
 using ::testing::_;
 using ::testing::AnyNumber;
 using ::testing::AtLeast;
+using ::testing::ElementsAreArray;
 using ::testing::Field;
 using ::testing::Gt;
 using ::testing::Le;
@@ -436,6 +437,68 @@
   packet_router_.RemoveSendRtpModule(&rtp);
 }
 
+TEST_F(PacketRouterTest, ForwardsAbortedRetransmissions) {
+  NiceMock<MockRtpRtcpInterface> rtp_1;
+  NiceMock<MockRtpRtcpInterface> rtp_2;
+
+  const uint32_t kSsrc1 = 1234;
+  const uint32_t kSsrc2 = 2345;
+  const uint32_t kInvalidSsrc = 3456;
+
+  ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
+  ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2));
+
+  packet_router_.AddSendRtpModule(&rtp_1, false);
+  packet_router_.AddSendRtpModule(&rtp_2, false);
+
+  // Sets of retransmission sequence numbers we wish to abort, per ssrc.
+  const uint16_t kAbortedRetransmissionsOnSsrc1[] = {17, 42};
+  const uint16_t kAbortedRetransmissionsOnSsrc2[] = {1337, 4711};
+  const uint16_t kAbortedRetransmissionsOnSsrc3[] = {123};
+
+  EXPECT_CALL(rtp_1, OnAbortedRetransmissions(
+                         ElementsAreArray(kAbortedRetransmissionsOnSsrc1)));
+  EXPECT_CALL(rtp_2, OnAbortedRetransmissions(
+                         ElementsAreArray(kAbortedRetransmissionsOnSsrc2)));
+
+  packet_router_.OnAbortedRetransmissions(kSsrc1,
+                                          kAbortedRetransmissionsOnSsrc1);
+  packet_router_.OnAbortedRetransmissions(kSsrc2,
+                                          kAbortedRetransmissionsOnSsrc2);
+
+  // Should be noop and not cause any issues.
+  packet_router_.OnAbortedRetransmissions(kInvalidSsrc,
+                                          kAbortedRetransmissionsOnSsrc3);
+
+  packet_router_.RemoveSendRtpModule(&rtp_1);
+  packet_router_.RemoveSendRtpModule(&rtp_2);
+}
+
+TEST_F(PacketRouterTest, ReportsRtxSsrc) {
+  NiceMock<MockRtpRtcpInterface> rtp_1;
+  NiceMock<MockRtpRtcpInterface> rtp_2;
+
+  const uint32_t kSsrc1 = 1234;
+  const uint32_t kRtxSsrc1 = 1235;
+  const uint32_t kSsrc2 = 2345;
+  const uint32_t kInvalidSsrc = 3456;
+
+  ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
+  ON_CALL(rtp_1, RtxSsrc).WillByDefault(Return(kRtxSsrc1));
+  ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2));
+
+  packet_router_.AddSendRtpModule(&rtp_1, false);
+  packet_router_.AddSendRtpModule(&rtp_2, false);
+
+  EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kSsrc1), kRtxSsrc1);
+  EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kRtxSsrc1), absl::nullopt);
+  EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kSsrc2), absl::nullopt);
+  EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kInvalidSsrc), absl::nullopt);
+
+  packet_router_.RemoveSendRtpModule(&rtp_1);
+  packet_router_.RemoveSendRtpModule(&rtp_2);
+}
+
 #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
 using PacketRouterDeathTest = PacketRouterTest;
 TEST_F(PacketRouterDeathTest, DoubleRegistrationOfSendModuleDisallowed) {