Prepare packet router for flushing mechanism.
This CL adds the ability to forward aborted retransmission notifications
to specified RTP modules, as well as a way to find the RTX ssrc
associated with a media SSRC.
These will both be used by upcoming logic that can selectively flush
given streams from the pacer queue.
Bug: webrtc:11340
Change-Id: Ief3be47e4fd7dc5a1499bc21890e8979400ecb44
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/274706
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38050}
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index 9899fcd..988fe50 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -52,6 +52,15 @@
virtual std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() = 0;
virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) = 0;
+
+ // TODO(bugs.webrtc.org/11340): Make pure virtual once downstream projects
+ // have been updated.
+ virtual void OnAbortedRetransmissions(
+ uint32_t ssrc,
+ rtc::ArrayView<const uint16_t> sequence_numbers) {}
+ virtual absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t ssrc) const {
+ return absl::nullopt;
+ }
};
// Expected max pacer delay. If ExpectedQueueTime() is higher than
diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc
index db6a22b..37b8605 100644
--- a/modules/pacing/pacing_controller_unittest.cc
+++ b/modules/pacing/pacing_controller_unittest.cc
@@ -128,6 +128,14 @@
(),
(override));
MOCK_METHOD(size_t, SendPadding, (size_t target_size));
+ MOCK_METHOD(void,
+ OnAbortedRetransmissions,
+ (uint32_t, rtc::ArrayView<const uint16_t>),
+ (override));
+ MOCK_METHOD(absl::optional<uint32_t>,
+ GetRtxSsrcForMedia,
+ (uint32_t),
+ (const, override));
};
// Mock callback implementing the raw api.
@@ -147,6 +155,14 @@
GeneratePadding,
(DataSize target_size),
(override));
+ MOCK_METHOD(void,
+ OnAbortedRetransmissions,
+ (uint32_t, rtc::ArrayView<const uint16_t>),
+ (override));
+ MOCK_METHOD(absl::optional<uint32_t>,
+ GetRtxSsrcForMedia,
+ (uint32_t),
+ (const, override));
};
class PacingControllerPadding : public PacingController::PacketSender {
@@ -178,6 +194,12 @@
return packets;
}
+ void OnAbortedRetransmissions(uint32_t,
+ rtc::ArrayView<const uint16_t>) override {}
+ absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t) const override {
+ return absl::nullopt;
+ }
+
size_t padding_sent() { return padding_sent_; }
size_t total_bytes_sent() { return total_bytes_sent_; }
@@ -220,6 +242,12 @@
return packets;
}
+ void OnAbortedRetransmissions(uint32_t,
+ rtc::ArrayView<const uint16_t>) override {}
+ absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t) const override {
+ return absl::nullopt;
+ }
+
int packets_sent() const { return packets_sent_; }
int padding_sent() const { return padding_sent_; }
int total_packets_sent() const { return packets_sent_ + padding_sent_; }
diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc
index a09f191..b28d977 100644
--- a/modules/pacing/packet_router.cc
+++ b/modules/pacing/packet_router.cc
@@ -86,10 +86,10 @@
}
void PacketRouter::RemoveSendRtpModuleFromMap(uint32_t ssrc) {
- auto kv = send_modules_map_.find(ssrc);
- RTC_DCHECK(kv != send_modules_map_.end());
- send_modules_list_.remove(kv->second);
- send_modules_map_.erase(kv);
+ auto it = send_modules_map_.find(ssrc);
+ RTC_DCHECK(it != send_modules_map_.end());
+ send_modules_list_.remove(it->second);
+ send_modules_map_.erase(it);
}
void PacketRouter::RemoveSendRtpModule(RtpRtcpInterface* rtp_module) {
@@ -151,8 +151,8 @@
}
uint32_t ssrc = packet->Ssrc();
- auto kv = send_modules_map_.find(ssrc);
- if (kv == send_modules_map_.end()) {
+ auto it = send_modules_map_.find(ssrc);
+ if (it == send_modules_map_.end()) {
RTC_LOG(LS_WARNING)
<< "Failed to send packet, matching RTP module not found "
"or transport error. SSRC = "
@@ -160,7 +160,7 @@
return;
}
- RtpRtcpInterface* rtp_module = kv->second;
+ RtpRtcpInterface* rtp_module = it->second;
if (!rtp_module->TrySendPacket(packet.get(), cluster_info)) {
RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module.";
return;
@@ -235,6 +235,27 @@
return padding_packets;
}
+void PacketRouter::OnAbortedRetransmissions(
+ uint32_t ssrc,
+ rtc::ArrayView<const uint16_t> sequence_numbers) {
+ MutexLock lock(&modules_mutex_);
+ auto it = send_modules_map_.find(ssrc);
+ if (it != send_modules_map_.end()) {
+ it->second->OnAbortedRetransmissions(sequence_numbers);
+ }
+}
+
+absl::optional<uint32_t> PacketRouter::GetRtxSsrcForMedia(uint32_t ssrc) const {
+ MutexLock lock(&modules_mutex_);
+ auto it = send_modules_map_.find(ssrc);
+ if (it != send_modules_map_.end() && it->second->SSRC() == ssrc) {
+ // A module is registered with the given SSRC, and that SSRC is the main
+ // media SSRC for that RTP module.
+ return it->second->RtxSsrc();
+ }
+ return absl::nullopt;
+}
+
uint16_t PacketRouter::CurrentTransportSequenceNumber() const {
MutexLock lock(&modules_mutex_);
return transport_seq_ & 0xFFFF;
diff --git a/modules/pacing/packet_router.h b/modules/pacing/packet_router.h
index 11d8979..68b82c6 100644
--- a/modules/pacing/packet_router.h
+++ b/modules/pacing/packet_router.h
@@ -58,6 +58,10 @@
std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() override;
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) override;
+ void OnAbortedRetransmissions(
+ uint32_t ssrc,
+ rtc::ArrayView<const uint16_t> sequence_numbers) override;
+ absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t ssrc) const override;
uint16_t CurrentTransportSequenceNumber() const;
diff --git a/modules/pacing/packet_router_unittest.cc b/modules/pacing/packet_router_unittest.cc
index fc26922..65b2ad2 100644
--- a/modules/pacing/packet_router_unittest.cc
+++ b/modules/pacing/packet_router_unittest.cc
@@ -36,6 +36,7 @@
using ::testing::_;
using ::testing::AnyNumber;
using ::testing::AtLeast;
+using ::testing::ElementsAreArray;
using ::testing::Field;
using ::testing::Gt;
using ::testing::Le;
@@ -436,6 +437,68 @@
packet_router_.RemoveSendRtpModule(&rtp);
}
+TEST_F(PacketRouterTest, ForwardsAbortedRetransmissions) {
+ NiceMock<MockRtpRtcpInterface> rtp_1;
+ NiceMock<MockRtpRtcpInterface> rtp_2;
+
+ const uint32_t kSsrc1 = 1234;
+ const uint32_t kSsrc2 = 2345;
+ const uint32_t kInvalidSsrc = 3456;
+
+ ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
+ ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2));
+
+ packet_router_.AddSendRtpModule(&rtp_1, false);
+ packet_router_.AddSendRtpModule(&rtp_2, false);
+
+ // Sets of retransmission sequence numbers we wish to abort, per ssrc.
+ const uint16_t kAbortedRetransmissionsOnSsrc1[] = {17, 42};
+ const uint16_t kAbortedRetransmissionsOnSsrc2[] = {1337, 4711};
+ const uint16_t kAbortedRetransmissionsOnSsrc3[] = {123};
+
+ EXPECT_CALL(rtp_1, OnAbortedRetransmissions(
+ ElementsAreArray(kAbortedRetransmissionsOnSsrc1)));
+ EXPECT_CALL(rtp_2, OnAbortedRetransmissions(
+ ElementsAreArray(kAbortedRetransmissionsOnSsrc2)));
+
+ packet_router_.OnAbortedRetransmissions(kSsrc1,
+ kAbortedRetransmissionsOnSsrc1);
+ packet_router_.OnAbortedRetransmissions(kSsrc2,
+ kAbortedRetransmissionsOnSsrc2);
+
+ // Should be noop and not cause any issues.
+ packet_router_.OnAbortedRetransmissions(kInvalidSsrc,
+ kAbortedRetransmissionsOnSsrc3);
+
+ packet_router_.RemoveSendRtpModule(&rtp_1);
+ packet_router_.RemoveSendRtpModule(&rtp_2);
+}
+
+TEST_F(PacketRouterTest, ReportsRtxSsrc) {
+ NiceMock<MockRtpRtcpInterface> rtp_1;
+ NiceMock<MockRtpRtcpInterface> rtp_2;
+
+ const uint32_t kSsrc1 = 1234;
+ const uint32_t kRtxSsrc1 = 1235;
+ const uint32_t kSsrc2 = 2345;
+ const uint32_t kInvalidSsrc = 3456;
+
+ ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
+ ON_CALL(rtp_1, RtxSsrc).WillByDefault(Return(kRtxSsrc1));
+ ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2));
+
+ packet_router_.AddSendRtpModule(&rtp_1, false);
+ packet_router_.AddSendRtpModule(&rtp_2, false);
+
+ EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kSsrc1), kRtxSsrc1);
+ EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kRtxSsrc1), absl::nullopt);
+ EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kSsrc2), absl::nullopt);
+ EXPECT_EQ(packet_router_.GetRtxSsrcForMedia(kInvalidSsrc), absl::nullopt);
+
+ packet_router_.RemoveSendRtpModule(&rtp_1);
+ packet_router_.RemoveSendRtpModule(&rtp_2);
+}
+
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
using PacketRouterDeathTest = PacketRouterTest;
TEST_F(PacketRouterDeathTest, DoubleRegistrationOfSendModuleDisallowed) {