Introduce support for video packet batching.
This CL introduces a new feature enabling video packet send batches.
The feature is enabled via
PeerConnectionInterface
::RTCConfiguration
::MediaConfig
::enable_send_packet_batching.
PacketOptions have been augmented with attribute "batchable" (set for
all video packets) and attribute "last_packet_in_batch" which gives
injected AsyncPacketSockets a chance to understand when a batch begins
and ends.
When the feature is on, packets are collected in RtpSenderEgress. On
reception of OnBatchComplete from PacingController, RtpSenderEgress
sends the collected batch, setting "last_packet_in_batch" to true
in the last packet.
Bug: chromium:1439830
Change-Id: I1846b9d4a8a0efd227d617691213a2e048bdc8a2
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/303720
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40012}
diff --git a/api/call/transport.h b/api/call/transport.h
index 8bff288..387ce8d 100644
--- a/api/call/transport.h
+++ b/api/call/transport.h
@@ -36,6 +36,10 @@
bool is_retransmit = false;
bool included_in_feedback = false;
bool included_in_allocation = false;
+ // Whether this packet can be part of a packet batch at lower levels.
+ bool batchable = false;
+ // Whether this packet is the last of a batch.
+ bool last_packet_in_batch = false;
};
class Transport {
diff --git a/call/call_config.h b/call/call_config.h
index 6df4ab7..918c077 100644
--- a/call/call_config.h
+++ b/call/call_config.h
@@ -81,6 +81,9 @@
// The burst interval of the pacer, see TaskQueuePacedSender constructor.
absl::optional<TimeDelta> pacer_burst_interval;
+
+ // Enables send packet batching from the egress RTP sender.
+ bool enable_send_packet_batching = false;
};
} // namespace webrtc
diff --git a/call/rtp_config.h b/call/rtp_config.h
index 0cc9466..a01a902 100644
--- a/call/rtp_config.h
+++ b/call/rtp_config.h
@@ -159,6 +159,9 @@
// RTCP CNAME, see RFC 3550.
std::string c_name;
+ // Enables send packet batching from the egress RTP sender.
+ bool enable_send_packet_batching = false;
+
bool IsMediaSsrc(uint32_t ssrc) const;
bool IsRtxSsrc(uint32_t ssrc) const;
bool IsFlexfecSsrc(uint32_t ssrc) const;
diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc
index 9108e83..27040f3 100644
--- a/call/rtp_video_sender.cc
+++ b/call/rtp_video_sender.cc
@@ -235,6 +235,8 @@
configuration.extmap_allow_mixed = rtp_config.extmap_allow_mixed;
configuration.rtcp_report_interval_ms = rtcp_report_interval_ms;
configuration.field_trials = &trials;
+ configuration.enable_send_packet_batching =
+ rtp_config.enable_send_packet_batching;
std::vector<RtpStreamSender> rtp_streams;
diff --git a/media/base/media_channel_impl.cc b/media/base/media_channel_impl.cc
index ca2a117..0de04b6 100644
--- a/media/base/media_channel_impl.cc
+++ b/media/base/media_channel_impl.cc
@@ -203,6 +203,8 @@
[this, packet_id = options.packet_id,
included_in_feedback = options.included_in_feedback,
included_in_allocation = options.included_in_allocation,
+ batchable = options.batchable,
+ last_packet_in_batch = options.last_packet_in_batch,
packet = rtc::CopyOnWriteBuffer(data, len, kMaxRtpPacketLen)]() mutable {
rtc::PacketOptions rtc_options;
rtc_options.packet_id = packet_id;
@@ -213,6 +215,8 @@
included_in_feedback;
rtc_options.info_signaled_after_sent.included_in_allocation =
included_in_allocation;
+ rtc_options.batchable = batchable;
+ rtc_options.last_packet_in_batch = last_packet_in_batch;
SendPacket(&packet, rtc_options);
};
diff --git a/media/base/media_config.h b/media/base/media_config.h
index b383c9a..7827705 100644
--- a/media/base/media_config.h
+++ b/media/base/media_config.h
@@ -62,6 +62,9 @@
// Time interval between RTCP report for video
int rtcp_report_interval_ms = 1000;
+
+ // Enables send packet batching from the egress RTP sender.
+ bool enable_send_packet_batching = false;
} video;
// Audio-specific config.
@@ -82,6 +85,8 @@
video.experiment_cpu_load_estimator ==
o.video.experiment_cpu_load_estimator &&
video.rtcp_report_interval_ms == o.video.rtcp_report_interval_ms &&
+ video.enable_send_packet_batching ==
+ o.video.enable_send_packet_batching &&
audio.rtcp_report_interval_ms == o.audio.rtcp_report_interval_ms;
}
diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc
index cc1edc8..04056da 100644
--- a/media/engine/webrtc_video_engine.cc
+++ b/media/engine/webrtc_video_engine.cc
@@ -1386,6 +1386,8 @@
config.crypto_options = crypto_options_;
config.rtp.extmap_allow_mixed = ExtmapAllowMixed();
config.rtcp_report_interval_ms = video_config_.rtcp_report_interval_ms;
+ config.rtp.enable_send_packet_batching =
+ video_config_.enable_send_packet_batching;
WebRtcVideoSendStream* stream = new WebRtcVideoSendStream(
call_, sp, std::move(config), default_send_options_,
diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index fb36f92..ea80c8c 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -62,6 +62,7 @@
"../utility:utility",
]
absl_deps = [
+ "//third_party/abseil-cpp/absl/cleanup",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index a526fc13..495aaeb 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -15,6 +15,7 @@
#include <utility>
#include <vector>
+#include "absl/cleanup/cleanup.h"
#include "absl/strings/match.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
@@ -374,6 +375,9 @@
}
void PacingController::ProcessPackets() {
+ absl::Cleanup cleanup = [packet_sender = packet_sender_] {
+ packet_sender->OnBatchComplete();
+ };
const Timestamp now = CurrentTime();
Timestamp target_send_time = now;
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index b0d802b..2145868 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -52,6 +52,8 @@
virtual std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() = 0;
virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) = 0;
+ // TODO(bugs.webrtc.org/1439830): Make pure virtual once subclasses adapt.
+ virtual void OnBatchComplete() {}
// TODO(bugs.webrtc.org/11340): Make pure virtual once downstream projects
// have been updated.
diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc
index 2c5f8e9..ade71cd 100644
--- a/modules/pacing/pacing_controller_unittest.cc
+++ b/modules/pacing/pacing_controller_unittest.cc
@@ -138,6 +138,7 @@
GetRtxSsrcForMedia,
(uint32_t),
(const, override));
+ MOCK_METHOD(void, OnBatchComplete, (), (override));
};
// Mock callback implementing the raw api.
@@ -165,6 +166,7 @@
GetRtxSsrcForMedia,
(uint32_t),
(const, override));
+ MOCK_METHOD(void, OnBatchComplete, (), (override));
};
class PacingControllerPadding : public PacingController::PacketSender {
@@ -202,6 +204,8 @@
return absl::nullopt;
}
+ void OnBatchComplete() override {}
+
size_t padding_sent() { return padding_sent_; }
size_t total_bytes_sent() { return total_bytes_sent_; }
@@ -260,6 +264,7 @@
absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t) const override {
return absl::nullopt;
}
+ void OnBatchComplete() override {}
int packets_sent() const { return packets_sent_; }
int padding_packets_sent() const { return padding_packets_sent_; }
@@ -1454,6 +1459,13 @@
pacer->ProcessPackets();
}
+TEST_F(PacingControllerTest, ProvidesOnBatchCompleteToPacketSender) {
+ MockPacketSender callback;
+ auto pacer = std::make_unique<PacingController>(&clock_, &callback, trials_);
+ EXPECT_CALL(callback, OnBatchComplete);
+ pacer->ProcessPackets();
+}
+
TEST_F(PacingControllerTest, ProbeClusterId) {
MockPacketSender callback;
uint32_t ssrc = 12346;
diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc
index a25f692..135b618 100644
--- a/modules/pacing/packet_router.cc
+++ b/modules/pacing/packet_router.cc
@@ -90,6 +90,7 @@
auto it = send_modules_map_.find(ssrc);
RTC_DCHECK(it != send_modules_map_.end());
send_modules_list_.remove(it->second);
+ RTC_CHECK(modules_used_in_current_batch_.empty());
send_modules_map_.erase(it);
}
@@ -166,6 +167,7 @@
RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module.";
return;
}
+ modules_used_in_current_batch_.insert(rtp_module);
// Sending succeeded.
@@ -184,6 +186,16 @@
}
}
+void PacketRouter::OnBatchComplete() {
+ RTC_DCHECK_RUN_ON(&thread_checker_);
+ TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
+ "PacketRouter::OnBatchComplete");
+ for (auto& module : modules_used_in_current_batch_) {
+ module->OnBatchComplete();
+ }
+ modules_used_in_current_batch_.clear();
+}
+
std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::FetchFec() {
RTC_DCHECK_RUN_ON(&thread_checker_);
std::vector<std::unique_ptr<RtpPacketToSend>> fec_packets =
diff --git a/modules/pacing/packet_router.h b/modules/pacing/packet_router.h
index 805f422..61779f4 100644
--- a/modules/pacing/packet_router.h
+++ b/modules/pacing/packet_router.h
@@ -16,6 +16,7 @@
#include <list>
#include <memory>
+#include <set>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -62,6 +63,7 @@
uint32_t ssrc,
rtc::ArrayView<const uint16_t> sequence_numbers) override;
absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t ssrc) const override;
+ void OnBatchComplete() override;
uint16_t CurrentTransportSequenceNumber() const;
@@ -108,6 +110,8 @@
std::vector<std::unique_ptr<RtpPacketToSend>> pending_fec_packets_
RTC_GUARDED_BY(thread_checker_);
+ std::set<RtpRtcpInterface*> modules_used_in_current_batch_
+ RTC_GUARDED_BY(thread_checker_);
};
} // namespace webrtc
#endif // MODULES_PACING_PACKET_ROUTER_H_
diff --git a/modules/pacing/packet_router_unittest.cc b/modules/pacing/packet_router_unittest.cc
index 2528129..7604de6 100644
--- a/modules/pacing/packet_router_unittest.cc
+++ b/modules/pacing/packet_router_unittest.cc
@@ -252,6 +252,7 @@
// If the last active module is removed, and no module sends media before
// the next padding request, and arbitrary module will be selected.
+ packet_router_.OnBatchComplete();
packet_router_.RemoveSendRtpModule(&rtp_2);
// Send on and then remove all remaining modules.
@@ -301,6 +302,7 @@
packet_router.CurrentTransportSequenceNumber());
}
+ packet_router.OnBatchComplete();
packet_router.RemoveSendRtpModule(&rtp_1);
}
@@ -346,7 +348,7 @@
_))
.WillOnce(Return(true));
packet_router_.SendPacket(std::move(packet), PacedPacketInfo());
-
+ packet_router_.OnBatchComplete();
packet_router_.RemoveSendRtpModule(&rtp_1);
}
@@ -390,6 +392,7 @@
.WillOnce(Return(true));
packet_router_.SendPacket(std::move(packet), PacedPacketInfo());
+ packet_router_.OnBatchComplete();
packet_router_.RemoveSendRtpModule(&rtp_1);
packet_router_.RemoveSendRtpModule(&rtp_2);
}
@@ -430,6 +433,7 @@
.WillOnce(Return(true));
packet_router_.SendPacket(std::move(packet), PacedPacketInfo());
+ packet_router_.OnBatchComplete();
packet_router_.RemoveSendRtpModule(&rtp);
}
@@ -495,6 +499,24 @@
packet_router_.RemoveSendRtpModule(&rtp_2);
}
+TEST_F(PacketRouterTest, RoutesBatchCompleteToActiveModules) {
+ NiceMock<MockRtpRtcpInterface> rtp_1;
+ NiceMock<MockRtpRtcpInterface> rtp_2;
+ constexpr uint32_t kSsrc1 = 4711;
+ constexpr uint32_t kSsrc2 = 1234;
+ ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
+ ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2));
+ packet_router_.AddSendRtpModule(&rtp_1, false);
+ packet_router_.AddSendRtpModule(&rtp_2, false);
+ EXPECT_CALL(rtp_1, TrySendPacket).WillOnce(Return(true));
+ packet_router_.SendPacket(BuildRtpPacket(kSsrc1), PacedPacketInfo());
+ EXPECT_CALL(rtp_1, OnBatchComplete);
+ EXPECT_CALL(rtp_2, OnBatchComplete).Times(0);
+ packet_router_.OnBatchComplete();
+ packet_router_.RemoveSendRtpModule(&rtp_1);
+ packet_router_.RemoveSendRtpModule(&rtp_2);
+}
+
#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
using PacketRouterDeathTest = PacketRouterTest;
TEST_F(PacketRouterDeathTest, DoubleRegistrationOfSendModuleDisallowed) {
diff --git a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
index 52c3fe1..e1e787a 100644
--- a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
+++ b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
@@ -88,6 +88,7 @@
(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info),
(override));
+ MOCK_METHOD(void, OnBatchComplete, (), (override));
MOCK_METHOD(void,
SetFecProtectionParams,
(const FecProtectionParams& delta_params,
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl.h b/modules/rtp_rtcp/source/rtp_rtcp_impl.h
index f9d7c72..509036d 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl.h
@@ -135,6 +135,8 @@
bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) override;
+ void OnBatchComplete() override {}
+
void SetFecProtectionParams(const FecProtectionParams& delta_params,
const FecProtectionParams& key_params) override;
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
index 79ae08d..f5627c2 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
@@ -344,6 +344,11 @@
return true;
}
+void ModuleRtpRtcpImpl2::OnBatchComplete() {
+ RTC_DCHECK(rtp_sender_);
+ rtp_sender_->packet_sender.OnBatchComplete();
+}
+
void ModuleRtpRtcpImpl2::SetFecProtectionParams(
const FecProtectionParams& delta_params,
const FecProtectionParams& key_params) {
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
index 214ef04..147cd33 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
@@ -146,6 +146,7 @@
bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) override;
+ void OnBatchComplete() override;
void SetFecProtectionParams(const FecProtectionParams& delta_params,
const FecProtectionParams& key_params) override;
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_interface.h b/modules/rtp_rtcp/source/rtp_rtcp_interface.h
index 92f1a5b..89c6d46 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_interface.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_interface.h
@@ -148,6 +148,9 @@
// not negotiated. If the RID and Repaired RID extensions are not
// registered, the RID will not be sent.
std::string rid;
+
+ // Enables send packet batching from the egress RTP sender.
+ bool enable_send_packet_batching = false;
};
// Stats for RTCP sender reports (SR) for a specific SSRC.
@@ -317,6 +320,10 @@
virtual bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) = 0;
+ // Notifies that a batch of packet sends is completed. The implementation can
+ // use this to optimize packet sending.
+ virtual void OnBatchComplete() = 0;
+
// Update the FEC protection parameters to use for delta- and key-frames.
// Only used when deferred FEC is active.
virtual void SetFecProtectionParams(
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc
index d6052f8..fdc2792 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc
@@ -66,7 +66,8 @@
RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
RtpPacketHistory* packet_history)
- : worker_queue_(TaskQueueBase::Current()),
+ : enable_send_packet_batching_(config.enable_send_packet_batching),
+ worker_queue_(TaskQueueBase::Current()),
ssrc_(config.local_media_ssrc),
rtx_ssrc_(config.rtx_send_ssrc),
flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
@@ -76,9 +77,7 @@
packet_history_(packet_history),
transport_(config.outgoing_transport),
event_log_(config.event_log),
-#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
is_audio_(config.audio),
-#endif
need_rtp_packet_infos_(config.need_rtp_packet_infos),
fec_generator_(config.fec_generator),
transport_feedback_observer_(config.transport_feedback_callback),
@@ -139,13 +138,13 @@
RTC_DCHECK(packet->retransmitted_sequence_number().has_value());
}
- const uint32_t packet_ssrc = packet->Ssrc();
const Timestamp now = clock_->CurrentTime();
#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
- worker_queue_->PostTask(SafeTask(
- task_safety_.flag(),
- [this, now, packet_ssrc]() { BweTestLoggingPlot(now, packet_ssrc); }));
+ worker_queue_->PostTask(SafeTask(task_safety_.flag(),
+ [this, now, packet_ssrc = packet->Ssrc()]() {
+ BweTestLoggingPlot(now, packet_ssrc);
+ }));
#endif
if (need_rtp_packet_infos_ &&
@@ -225,6 +224,26 @@
}
}
+ auto compound_packet = Packet{std::move(packet), pacing_info, now};
+ if (enable_send_packet_batching_ && !is_audio_) {
+ packets_to_send_.push_back(std::move(compound_packet));
+ } else {
+ CompleteSendPacket(compound_packet, false);
+ }
+}
+
+void RtpSenderEgress::OnBatchComplete() {
+ RTC_DCHECK_RUN_ON(&pacer_checker_);
+ for (auto& packet : packets_to_send_) {
+ CompleteSendPacket(packet, &packet == &packets_to_send_.back());
+ }
+ packets_to_send_.clear();
+}
+
+void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet,
+ bool last_in_batch) {
+ auto& [packet, pacing_info, now] = compound_packet;
+
const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
packet->packet_type() == RtpPacketMediaType::kVideo;
@@ -246,12 +265,14 @@
options.additional_data = packet->additional_data();
+ const uint32_t packet_ssrc = packet->Ssrc();
if (packet->packet_type() != RtpPacketMediaType::kPadding &&
packet->packet_type() != RtpPacketMediaType::kRetransmission) {
UpdateDelayStatistics(packet->capture_time(), now, packet_ssrc);
UpdateOnSendPacket(options.packet_id, packet->capture_time(), packet_ssrc);
}
-
+ options.batchable = enable_send_packet_batching_ && !is_audio_;
+ options.last_packet_in_batch = last_in_batch;
const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
// Put packet in retransmission history or update pending status even if
@@ -279,9 +300,9 @@
// TODO(bugs.webrtc.org/137439): clean up task posting when the combined
// network/worker project launches.
if (TaskQueueBase::Current() != worker_queue_) {
- worker_queue_->PostTask(
- SafeTask(task_safety_.flag(), [this, now, packet_ssrc, packet_type,
- counter = std::move(counter), size]() {
+ worker_queue_->PostTask(SafeTask(
+ task_safety_.flag(), [this, now = now, packet_ssrc, packet_type,
+ counter = std::move(counter), size]() {
RTC_DCHECK_RUN_ON(worker_queue_);
UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter),
size);
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.h b/modules/rtp_rtcp/source/rtp_sender_egress.h
index 4b7b502..accdeb1 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress.h
+++ b/modules/rtp_rtcp/source/rtp_sender_egress.h
@@ -67,6 +67,7 @@
void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info) RTC_LOCKS_EXCLUDED(lock_);
+ void OnBatchComplete();
uint32_t Ssrc() const { return ssrc_; }
absl::optional<uint32_t> RtxSsrc() const { return rtx_ssrc_; }
absl::optional<uint32_t> FlexFecSsrc() const { return flexfec_ssrc_; }
@@ -100,6 +101,13 @@
rtc::ArrayView<const uint16_t> sequence_numbers);
private:
+ struct Packet {
+ std::unique_ptr<RtpPacketToSend> rtp_packet;
+ PacedPacketInfo info;
+ Timestamp now;
+ };
+ void CompleteSendPacket(const Packet& compound_packet, bool last_in_batch)
+ RTC_LOCKS_EXCLUDED(lock_) RTC_RUN_ON(pacer_checker_);
RtpSendRates GetSendRatesLocked(Timestamp now) const
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
bool HasCorrectSsrc(const RtpPacketToSend& packet) const;
@@ -128,6 +136,7 @@
// Called on a timer, once a second, on the worker_queue_.
void PeriodicUpdate();
+ const bool enable_send_packet_batching_;
TaskQueueBase* const worker_queue_;
RTC_NO_UNIQUE_ADDRESS SequenceChecker pacer_checker_;
const uint32_t ssrc_;
@@ -138,9 +147,7 @@
RtpPacketHistory* const packet_history_;
Transport* const transport_;
RtcEventLog* const event_log_;
-#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
const bool is_audio_;
-#endif
const bool need_rtp_packet_infos_;
VideoFecGenerator* const fec_generator_ RTC_GUARDED_BY(pacer_checker_);
absl::optional<uint16_t> last_sent_seq_ RTC_GUARDED_BY(pacer_checker_);
@@ -178,6 +185,7 @@
const std::unique_ptr<RtpSequenceNumberMap> rtp_sequence_number_map_
RTC_GUARDED_BY(worker_queue_);
RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_);
+ std::vector<Packet> packets_to_send_ RTC_GUARDED_BY(pacer_checker_);
ScopedTaskSafety task_safety_;
};
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc
index 504a467..75f1835 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc
@@ -35,9 +35,10 @@
namespace {
using ::testing::_;
+using ::testing::AllOf;
using ::testing::Field;
+using ::testing::InSequence;
using ::testing::NiceMock;
-using ::testing::Optional;
using ::testing::StrictMock;
constexpr Timestamp kStartTime = Timestamp::Millis(123456789);
@@ -96,12 +97,14 @@
public:
explicit TestTransport(RtpHeaderExtensionMap* extensions)
: total_data_sent_(DataSize::Zero()), extensions_(extensions) {}
+ MOCK_METHOD(void, SentRtp, (const PacketOptions& options), ());
bool SendRtp(const uint8_t* packet,
size_t length,
const PacketOptions& options) override {
total_data_sent_ += DataSize::Bytes(length);
last_packet_.emplace(rtc::MakeArrayView(packet, length), options,
extensions_);
+ SentRtp(options);
return true;
}
@@ -133,6 +136,7 @@
RtpRtcpInterface::Configuration DefaultConfig() {
RtpRtcpInterface::Configuration config;
+ config.audio = false;
config.clock = clock_;
config.outgoing_transport = &transport_;
config.local_media_ssrc = kSsrc;
@@ -175,7 +179,7 @@
NiceMock<MockSendPacketObserver> send_packet_observer_;
NiceMock<MockTransportFeedbackObserver> feedback_observer_;
RtpHeaderExtensionMap header_extensions_;
- TestTransport transport_;
+ NiceMock<TestTransport> transport_;
RtpPacketHistory packet_history_;
test::ExplicitKeyValueConfig trials_;
uint16_t sequence_number_;
@@ -209,6 +213,43 @@
sender->SendPacket(std::move(packet), PacedPacketInfo());
}
+TEST_F(RtpSenderEgressTest, SendsPacketsOneByOneWhenNotBatching) {
+ std::unique_ptr<RtpSenderEgress> sender = CreateRtpSenderEgress();
+ EXPECT_CALL(transport_,
+ SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false),
+ Field(&PacketOptions::batchable, false))));
+ sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
+}
+
+TEST_F(RtpSenderEgressTest, SendsPacketsOneByOneWhenBatchingWithAudio) {
+ auto config = DefaultConfig();
+ config.enable_send_packet_batching = true;
+ config.audio = true;
+ auto sender = std::make_unique<RtpSenderEgress>(config, &packet_history_);
+ EXPECT_CALL(transport_,
+ SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false),
+ Field(&PacketOptions::batchable, false))))
+ .Times(2);
+ sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
+ sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
+}
+
+TEST_F(RtpSenderEgressTest, CollectsPacketsWhenBatchingWithVideo) {
+ auto config = DefaultConfig();
+ config.enable_send_packet_batching = true;
+ auto sender = std::make_unique<RtpSenderEgress>(config, &packet_history_);
+ sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
+ sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
+ InSequence s;
+ EXPECT_CALL(transport_,
+ SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false),
+ Field(&PacketOptions::batchable, true))));
+ EXPECT_CALL(transport_,
+ SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, true),
+ Field(&PacketOptions::batchable, true))));
+ sender->OnBatchComplete();
+}
+
TEST_F(RtpSenderEgressTest, PacketOptionsIsRetransmitSetByPacketType) {
std::unique_ptr<RtpSenderEgress> sender = CreateRtpSenderEgress();
diff --git a/rtc_base/async_packet_socket.h b/rtc_base/async_packet_socket.h
index 90f2a13..ad9a79e 100644
--- a/rtc_base/async_packet_socket.h
+++ b/rtc_base/async_packet_socket.h
@@ -54,6 +54,12 @@
PacketTimeUpdateParams packet_time_params;
// PacketInfo is passed to SentPacket when signaling this packet is sent.
PacketInfo info_signaled_after_sent;
+ // True if this is a batchable packet. Batchable packets are collected at low
+ // levels and sent first when their AsyncPacketSocket receives a
+ // OnSendBatchComplete call.
+ bool batchable = false;
+ // True if this is the last packet of a batch.
+ bool last_packet_in_batch = false;
};
// Provides the ability to receive packets asynchronously. Sends are not