Remove pending packets from the pacer when an RTP module is removed.

This CL adds functionality to remove packets matching a given SSRC from
the pacer queue, and calls that with any SSRCs used by an RTP module
when that module is removed.

Bug: chromium:1395081
Change-Id: I13c0285ddca600e784ad04a806727a508ede6dcc
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/287124
Reviewed-by: Jakob Ivarsson‎ <jakobi@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38880}
diff --git a/audio/channel_send.cc b/audio/channel_send.cc
index d260406..fdc11a8 100644
--- a/audio/channel_send.cc
+++ b/audio/channel_send.cc
@@ -262,6 +262,11 @@
     rtp_packet_pacer_->EnqueuePackets(std::move(packets));
   }
 
+  void RemovePacketsForSsrc(uint32_t ssrc) override {
+    MutexLock lock(&mutex_);
+    rtp_packet_pacer_->RemovePacketsForSsrc(ssrc);
+  }
+
  private:
   SequenceChecker thread_checker_;
   Mutex mutex_;
@@ -565,6 +570,7 @@
 
   RTC_DCHECK(packet_router_);
   packet_router_->RemoveSendRtpModule(rtp_rtcp_.get());
+  rtp_packet_pacer_proxy_->RemovePacketsForSsrc(rtp_rtcp_->SSRC());
 }
 
 void ChannelSend::SetEncoder(int payload_type,
diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc
index 3ca72b1..de19b97 100644
--- a/call/rtp_video_sender.cc
+++ b/call/rtp_video_sender.cc
@@ -515,6 +515,17 @@
       // prevent any stray packets in the pacer from asynchronously arriving
       // to a disabled module.
       transport_->packet_router()->RemoveSendRtpModule(&rtp_module);
+
+      // Clear the pacer queue of any packets pertaining to this module.
+      transport_->packet_sender()->RemovePacketsForSsrc(rtp_module.SSRC());
+      if (rtp_module.RtxSsrc().has_value()) {
+        transport_->packet_sender()->RemovePacketsForSsrc(
+            *rtp_module.RtxSsrc());
+      }
+      if (rtp_module.FlexfecSsrc().has_value()) {
+        transport_->packet_sender()->RemovePacketsForSsrc(
+            *rtp_module.FlexfecSsrc());
+      }
     }
 
     // If set to false this module won't send media.
diff --git a/call/rtp_video_sender_unittest.cc b/call/rtp_video_sender_unittest.cc
index a8595e3..da2bed6 100644
--- a/call/rtp_video_sender_unittest.cc
+++ b/call/rtp_video_sender_unittest.cc
@@ -1098,4 +1098,81 @@
   }
 }
 
+TEST(RtpVideoSenderTest, ClearsPendingPacketsOnInactivation) {
+  RtpVideoSenderTestFixture test({kSsrc1}, {kRtxSsrc1}, kPayloadType, {});
+  test.SetActiveModules({true});
+
+  RtpHeaderExtensionMap extensions;
+  extensions.Register<RtpDependencyDescriptorExtension>(
+      kDependencyDescriptorExtensionId);
+  std::vector<RtpPacket> sent_packets;
+  ON_CALL(test.transport(), SendRtp)
+      .WillByDefault([&](const uint8_t* packet, size_t length,
+                         const PacketOptions& options) {
+        sent_packets.emplace_back(&extensions);
+        EXPECT_TRUE(sent_packets.back().Parse(packet, length));
+        return true;
+      });
+
+  // Set a very low bitrate.
+  test.router()->OnBitrateUpdated(
+      CreateBitrateAllocationUpdate(/*rate_bps=*/30'000),
+      /*framerate=*/30);
+
+  // Create and send a large keyframe.
+  const size_t kImageSizeBytes = 10000;
+  constexpr uint8_t kPayload[kImageSizeBytes] = {'a'};
+  EncodedImage encoded_image;
+  encoded_image.SetTimestamp(1);
+  encoded_image.capture_time_ms_ = 2;
+  encoded_image._frameType = VideoFrameType::kVideoFrameKey;
+  encoded_image.SetEncodedData(
+      EncodedImageBuffer::Create(kPayload, sizeof(kPayload)));
+  EXPECT_EQ(test.router()
+                ->OnEncodedImage(encoded_image, /*codec_specific=*/nullptr)
+                .error,
+            EncodedImageCallback::Result::OK);
+
+  // Advance time a small amount, check that sent data is only part of the
+  // image.
+  test.AdvanceTime(TimeDelta::Millis(5));
+  DataSize transmittedPayload = DataSize::Zero();
+  for (const RtpPacket& packet : sent_packets) {
+    transmittedPayload += DataSize::Bytes(packet.payload_size());
+    // Make sure we don't see the end of the frame.
+    EXPECT_FALSE(packet.Marker());
+  }
+  EXPECT_GT(transmittedPayload, DataSize::Zero());
+  EXPECT_LT(transmittedPayload, DataSize::Bytes(kImageSizeBytes / 4));
+
+  // Record the RTP timestamp of the first frame.
+  const uint32_t first_frame_timestamp = sent_packets[0].Timestamp();
+  sent_packets.clear();
+
+  // Disable the sending module and advance time slightly. No packets should be
+  // sent.
+  test.SetActiveModules({false});
+  test.AdvanceTime(TimeDelta::Millis(20));
+  EXPECT_TRUE(sent_packets.empty());
+
+  // Reactive the send module - any packets should have been removed, so nothing
+  // should be transmitted.
+  test.SetActiveModules({true});
+  test.AdvanceTime(TimeDelta::Millis(33));
+  EXPECT_TRUE(sent_packets.empty());
+
+  // Send a new frame.
+  encoded_image.SetTimestamp(3);
+  encoded_image.capture_time_ms_ = 4;
+  EXPECT_EQ(test.router()
+                ->OnEncodedImage(encoded_image, /*codec_specific=*/nullptr)
+                .error,
+            EncodedImageCallback::Result::OK);
+  test.AdvanceTime(TimeDelta::Millis(33));
+
+  // Advance time, check we get new packets - but only for the second frame.
+  EXPECT_FALSE(sent_packets.empty());
+  EXPECT_NE(sent_packets[0].Timestamp(), first_frame_timestamp);
+}
+
 }  // namespace webrtc
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index ca24874..2251e56 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -148,6 +148,10 @@
   circuit_breaker_threshold_ = num_iterations;
 }
 
+void PacingController::RemovePacketsForSsrc(uint32_t ssrc) {
+  packet_queue_.RemovePacketsForSsrc(ssrc);
+}
+
 bool PacingController::IsProbing() const {
   return prober_.is_probing();
 }
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index eb4fb4e..91c0548 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -166,6 +166,9 @@
   // is considered erroneous to exceed.
   void SetCircuitBreakerThreshold(int num_iterations);
 
+  // Remove any pending packets matching this SSRC from the packet queue.
+  void RemovePacketsForSsrc(uint32_t ssrc);
+
  private:
   TimeDelta UpdateTimeAndGetElapsed(Timestamp now);
   bool ShouldSendKeepalive(Timestamp now) const;
diff --git a/modules/pacing/prioritized_packet_queue.cc b/modules/pacing/prioritized_packet_queue.cc
index b3874a2..0c285c4 100644
--- a/modules/pacing/prioritized_packet_queue.cc
+++ b/modules/pacing/prioritized_packet_queue.cc
@@ -60,7 +60,7 @@
 }
 
 PrioritizedPacketQueue::QueuedPacket
-PrioritizedPacketQueue::StreamQueue::DequePacket(int priority_level) {
+PrioritizedPacketQueue::StreamQueue::DequeuePacket(int priority_level) {
   RTC_DCHECK(!packets_[priority_level].empty());
   QueuedPacket packet = std::move(packets_[priority_level].front());
   packets_[priority_level].pop_front();
@@ -91,6 +91,16 @@
   return last_enqueue_time_;
 }
 
+std::array<std::deque<PrioritizedPacketQueue::QueuedPacket>,
+           PrioritizedPacketQueue::kNumPriorityLevels>
+PrioritizedPacketQueue::StreamQueue::DequeueAll() {
+  std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio;
+  for (int i = 0; i < kNumPriorityLevels; ++i) {
+    packets_by_prio[i].swap(packets_[i]);
+  }
+  return packets_by_prio;
+}
+
 PrioritizedPacketQueue::PrioritizedPacketQueue(Timestamp creation_time)
     : queue_time_sum_(TimeDelta::Zero()),
       pause_time_sum_(TimeDelta::Zero()),
@@ -162,54 +172,16 @@
 
   RTC_DCHECK_GE(top_active_prio_level_, 0);
   StreamQueue& stream_queue = *streams_by_prio_[top_active_prio_level_].front();
-  QueuedPacket packet = stream_queue.DequePacket(top_active_prio_level_);
-  --size_packets_;
-  RTC_DCHECK(packet.packet->packet_type().has_value());
-  RtpPacketMediaType packet_type = packet.packet->packet_type().value();
-  --size_packets_per_media_type_[static_cast<size_t>(packet_type)];
-  RTC_DCHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)],
-                0);
-  size_payload_ -= packet.PacketSize();
-
-  // Calculate the total amount of time spent by this packet in the queue
-  // while in a non-paused state. Note that the `pause_time_sum_ms_` was
-  // subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
-  // by subtracting it now we effectively remove the time spent in in the
-  // queue while in a paused state.
-  TimeDelta time_in_non_paused_state =
-      last_update_time_ - packet.enqueue_time - pause_time_sum_;
-  queue_time_sum_ -= time_in_non_paused_state;
-
-  // Set the time spent in the send queue, which is the per-packet equivalent of
-  // totalPacketSendDelay. The notion of being paused is an implementation
-  // detail that we do not want to expose, so it makes sense to report the
-  // metric excluding the pause time. This also avoids spikes in the metric.
-  // https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay
-  packet.packet->set_time_in_send_queue(time_in_non_paused_state);
-
-  RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
-
-  RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end());
-  enqueue_times_.erase(packet.enqueue_time_iterator);
+  QueuedPacket packet = stream_queue.DequeuePacket(top_active_prio_level_);
+  DequeuePacketInternal(packet);
 
   // Remove StreamQueue from head of fifo-queue for this prio level, and
   // and add it to the end if it still has packets.
   streams_by_prio_[top_active_prio_level_].pop_front();
   if (stream_queue.HasPacketsAtPrio(top_active_prio_level_)) {
     streams_by_prio_[top_active_prio_level_].push_back(&stream_queue);
-  } else if (streams_by_prio_[top_active_prio_level_].empty()) {
-    // No stream queues have packets at this prio level, find top priority
-    // that is not empty.
-    if (size_packets_ == 0) {
-      top_active_prio_level_ = -1;
-    } else {
-      for (int i = 0; i < kNumPriorityLevels; ++i) {
-        if (!streams_by_prio_[i].empty()) {
-          top_active_prio_level_ = i;
-          break;
-        }
-      }
-    }
+  } else {
+    MaybeUpdateTopPrioLevel();
   }
 
   return std::move(packet.packet);
@@ -276,4 +248,96 @@
   paused_ = paused;
 }
 
+void PrioritizedPacketQueue::RemovePacketsForSsrc(uint32_t ssrc) {
+  auto kv = streams_.find(ssrc);
+  if (kv != streams_.end()) {
+    // Dequeue all packets from the queue for this SSRC.
+    StreamQueue& queue = *kv->second;
+    std::array<std::deque<QueuedPacket>, kNumPriorityLevels> packets_by_prio =
+        queue.DequeueAll();
+    for (int i = 0; i < kNumPriorityLevels; ++i) {
+      std::deque<QueuedPacket>& packet_queue = packets_by_prio[i];
+      if (packet_queue.empty()) {
+        continue;
+      }
+
+      // First erase all packets at this prio level.
+      while (!packet_queue.empty()) {
+        QueuedPacket packet = std::move(packet_queue.front());
+        packet_queue.pop_front();
+        DequeuePacketInternal(packet);
+      }
+
+      // Next, deregister this `StreamQueue` from the round-robin tables.
+      RTC_DCHECK(!streams_by_prio_[i].empty());
+      if (streams_by_prio_[i].size() == 1) {
+        // This is the last and only queue that had packets for this prio level.
+        // Update the global top prio level if neccessary.
+        RTC_DCHECK(streams_by_prio_[i].front() == &queue);
+        streams_by_prio_[i].pop_front();
+        if (i == top_active_prio_level_) {
+          MaybeUpdateTopPrioLevel();
+        }
+      } else {
+        // More than stream had packets at this prio level, filter this one out.
+        std::deque<StreamQueue*> filtered_queue;
+        for (StreamQueue* queue_ptr : streams_by_prio_[i]) {
+          if (queue_ptr != &queue) {
+            filtered_queue.push_back(queue_ptr);
+          }
+        }
+        streams_by_prio_[i].swap(filtered_queue);
+      }
+    }
+  }
+}
+
+void PrioritizedPacketQueue::DequeuePacketInternal(QueuedPacket& packet) {
+  --size_packets_;
+  RTC_DCHECK(packet.packet->packet_type().has_value());
+  RtpPacketMediaType packet_type = packet.packet->packet_type().value();
+  --size_packets_per_media_type_[static_cast<size_t>(packet_type)];
+  RTC_DCHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)],
+                0);
+  size_payload_ -= packet.PacketSize();
+
+  // Calculate the total amount of time spent by this packet in the queue
+  // while in a non-paused state. Note that the `pause_time_sum_ms_` was
+  // subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
+  // by subtracting it now we effectively remove the time spent in in the
+  // queue while in a paused state.
+  TimeDelta time_in_non_paused_state =
+      last_update_time_ - packet.enqueue_time - pause_time_sum_;
+  queue_time_sum_ -= time_in_non_paused_state;
+
+  // Set the time spent in the send queue, which is the per-packet equivalent of
+  // totalPacketSendDelay. The notion of being paused is an implementation
+  // detail that we do not want to expose, so it makes sense to report the
+  // metric excluding the pause time. This also avoids spikes in the metric.
+  // https://w3c.github.io/webrtc-stats/#dom-rtcoutboundrtpstreamstats-totalpacketsenddelay
+  packet.packet->set_time_in_send_queue(time_in_non_paused_state);
+
+  RTC_DCHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
+
+  RTC_CHECK(packet.enqueue_time_iterator != enqueue_times_.end());
+  enqueue_times_.erase(packet.enqueue_time_iterator);
+}
+
+void PrioritizedPacketQueue::MaybeUpdateTopPrioLevel() {
+  if (streams_by_prio_[top_active_prio_level_].empty()) {
+    // No stream queues have packets at this prio level, find top priority
+    // that is not empty.
+    if (size_packets_ == 0) {
+      top_active_prio_level_ = -1;
+    } else {
+      for (int i = 0; i < kNumPriorityLevels; ++i) {
+        if (!streams_by_prio_[i].empty()) {
+          top_active_prio_level_ = i;
+          break;
+        }
+      }
+    }
+  }
+}
+
 }  // namespace webrtc
diff --git a/modules/pacing/prioritized_packet_queue.h b/modules/pacing/prioritized_packet_queue.h
index 3b5748f..364b53a 100644
--- a/modules/pacing/prioritized_packet_queue.h
+++ b/modules/pacing/prioritized_packet_queue.h
@@ -13,10 +13,12 @@
 
 #include <stddef.h>
 
+#include <array>
 #include <deque>
 #include <list>
 #include <memory>
 #include <unordered_map>
+#include <vector>
 
 #include "api/units/data_size.h"
 #include "api/units/time_delta.h"
@@ -80,6 +82,9 @@
   // Set the pause state, while `paused` is true queuing time is not counted.
   void SetPauseState(bool paused, Timestamp now);
 
+  // Remove any packets matching the given SSRC.
+  void RemovePacketsForSsrc(uint32_t ssrc);
+
  private:
   static constexpr int kNumPriorityLevels = 4;
 
@@ -107,18 +112,27 @@
     // count for that priority level went from zero to non-zero.
     bool EnqueuePacket(QueuedPacket packet, int priority_level);
 
-    QueuedPacket DequePacket(int priority_level);
+    QueuedPacket DequeuePacket(int priority_level);
 
     bool HasPacketsAtPrio(int priority_level) const;
     bool IsEmpty() const;
     Timestamp LeadingPacketEnqueueTime(int priority_level) const;
     Timestamp LastEnqueueTime() const;
 
+    std::array<std::deque<QueuedPacket>, kNumPriorityLevels> DequeueAll();
+
    private:
     std::deque<QueuedPacket> packets_[kNumPriorityLevels];
     Timestamp last_enqueue_time_;
   };
 
+  // Remove the packet from the internal state, e.g. queue time / size etc.
+  void DequeuePacketInternal(QueuedPacket& packet);
+
+  // Check if the queue pointed to by `top_active_prio_level_` is empty and
+  // if so move it to the lowest non-empty index.
+  void MaybeUpdateTopPrioLevel();
+
   // Cumulative sum, over all packets, of time spent in the queue.
   TimeDelta queue_time_sum_;
   // Cumulative sum of time the queue has spent in a paused state.
diff --git a/modules/pacing/prioritized_packet_queue_unittest.cc b/modules/pacing/prioritized_packet_queue_unittest.cc
index 5e79e7b..964051c 100644
--- a/modules/pacing/prioritized_packet_queue_unittest.cc
+++ b/modules/pacing/prioritized_packet_queue_unittest.cc
@@ -306,4 +306,58 @@
   }
 }
 
+TEST(PrioritizedPacketQueue, ClearsPackets) {
+  Timestamp now = Timestamp::Zero();
+  PrioritizedPacketQueue queue(now);
+  const uint32_t kSsrc = 1;
+
+  // Add two packets of each type, all using the same SSRC.
+  int sequence_number = 0;
+  for (size_t i = 0; i < kNumMediaTypes; ++i) {
+    queue.Push(now, CreatePacket(static_cast<RtpPacketMediaType>(i),
+                                 sequence_number++, kSsrc));
+    queue.Push(now, CreatePacket(static_cast<RtpPacketMediaType>(i),
+                                 sequence_number++, kSsrc));
+  }
+  EXPECT_EQ(queue.SizeInPackets(), 2 * int{kNumMediaTypes});
+
+  // Remove all of them.
+  queue.RemovePacketsForSsrc(kSsrc);
+  EXPECT_TRUE(queue.Empty());
+}
+
+TEST(PrioritizedPacketQueue, ClearPacketsAffectsOnlySpecifiedSsrc) {
+  Timestamp now = Timestamp::Zero();
+  PrioritizedPacketQueue queue(now);
+  const uint32_t kRemovingSsrc = 1;
+  const uint32_t kStayingSsrc = 2;
+
+  // Add an audio packet and a retransmission for the SSRC we will remove,
+  // ensuring they are first in line.
+  queue.Push(
+      now, CreatePacket(RtpPacketMediaType::kAudio, /*seq=*/1, kRemovingSsrc));
+  queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/2,
+                               kRemovingSsrc));
+
+  // Add a video packet and a retransmission for the SSRC that will remain.
+  // The retransmission packets now both have pointers to their respective qeues
+  // from the same prio level.
+  queue.Push(now,
+             CreatePacket(RtpPacketMediaType::kVideo, /*seq=*/3, kStayingSsrc));
+  queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, /*seq=*/4,
+                               kStayingSsrc));
+
+  EXPECT_EQ(queue.SizeInPackets(), 4);
+
+  // Clear the first two packets.
+  queue.RemovePacketsForSsrc(kRemovingSsrc);
+  EXPECT_EQ(queue.SizeInPackets(), 2);
+
+  // We should get the single remaining retransmission first, then the video
+  // packet.
+  EXPECT_EQ(queue.Pop()->SequenceNumber(), 4);
+  EXPECT_EQ(queue.Pop()->SequenceNumber(), 3);
+  EXPECT_TRUE(queue.Empty());
+}
+
 }  // namespace webrtc
diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc
index e8b695c..4ba2495 100644
--- a/modules/pacing/task_queue_paced_sender.cc
+++ b/modules/pacing/task_queue_paced_sender.cc
@@ -181,6 +181,14 @@
       }));
 }
 
+void TaskQueuePacedSender::RemovePacketsForSsrc(uint32_t ssrc) {
+  task_queue_.RunOrPost([this, ssrc]() {
+    RTC_DCHECK_RUN_ON(&task_queue_);
+    pacing_controller_.RemovePacketsForSsrc(ssrc);
+    MaybeProcessPackets(Timestamp::MinusInfinity());
+  });
+}
+
 void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
   task_queue_.RunOrPost([this, account_for_audio]() {
     RTC_DCHECK_RUN_ON(&task_queue_);
diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h
index 42aab11..ea335fd 100644
--- a/modules/pacing/task_queue_paced_sender.h
+++ b/modules/pacing/task_queue_paced_sender.h
@@ -70,6 +70,8 @@
   // PacingController::PacketSender::SendPacket() when it's time to send.
   void EnqueuePackets(
       std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
+  // Remove any pending packets matching this SSRC from the packet queue.
+  void RemovePacketsForSsrc(uint32_t ssrc) override;
 
   // Methods implementing RtpPacketPacer.
 
diff --git a/modules/rtp_rtcp/include/rtp_packet_sender.h b/modules/rtp_rtcp/include/rtp_packet_sender.h
index ae221b0..ebc6529 100644
--- a/modules/rtp_rtcp/include/rtp_packet_sender.h
+++ b/modules/rtp_rtcp/include/rtp_packet_sender.h
@@ -28,6 +28,11 @@
   // packets and the current target send rate.
   virtual void EnqueuePackets(
       std::vector<std::unique_ptr<RtpPacketToSend>> packets) = 0;
+
+  // Clear any pending packets with the given SSRC from the queue.
+  // TODO(crbug.com/1395081): Make pure virtual when downstream code has been
+  // updated.
+  virtual void RemovePacketsForSsrc(uint32_t ssrc) {}
 };
 
 }  // namespace webrtc
diff --git a/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.h b/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.h
index da833f5..609a90d 100644
--- a/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.h
+++ b/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.h
@@ -43,6 +43,7 @@
 
     void EnqueuePackets(
         std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
+    void RemovePacketsForSsrc(uint32_t ssrc) override {}
 
    private:
     uint16_t transport_sequence_number_;
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.h b/modules/rtp_rtcp/source/rtp_sender_egress.h
index ab62edd..e0a8d96 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress.h
+++ b/modules/rtp_rtcp/source/rtp_sender_egress.h
@@ -49,6 +49,8 @@
 
     void EnqueuePackets(
         std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
+    // Since we don't pace packets, there's no pending packets to remove.
+    void RemovePacketsForSsrc(uint32_t ssrc) override {}
 
    private:
     void PrepareForSend(RtpPacketToSend* packet);
diff --git a/modules/rtp_rtcp/source/rtp_sender_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_unittest.cc
index ea9277f..c9e98ff 100644
--- a/modules/rtp_rtcp/source/rtp_sender_unittest.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_unittest.cc
@@ -102,6 +102,7 @@
               EnqueuePackets,
               (std::vector<std::unique_ptr<RtpPacketToSend>>),
               (override));
+  MOCK_METHOD(void, RemovePacketsForSsrc, (uint32_t), (override));
 };
 
 }  // namespace