Introduce support for video packet batching.

This CL introduces a new feature enabling video packet send batches.
The feature is enabled via
PeerConnectionInterface
::RTCConfiguration
::MediaConfig
::enable_send_packet_batching.

PacketOptions have been augmented with attribute "batchable" (set for
all video packets) and attribute "last_packet_in_batch" which gives
injected AsyncPacketSockets a chance to understand when a batch begins
and ends.

When the feature is on, packets are collected in RtpSenderEgress. On
reception of OnBatchComplete from PacingController, RtpSenderEgress
sends the collected batch, setting "last_packet_in_batch" to true
in the last packet.

Bug: chromium:1439830
Change-Id: I1846b9d4a8a0efd227d617691213a2e048bdc8a2
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/303720
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40012}
diff --git a/api/call/transport.h b/api/call/transport.h
index 8bff288..387ce8d 100644
--- a/api/call/transport.h
+++ b/api/call/transport.h
@@ -36,6 +36,10 @@
   bool is_retransmit = false;
   bool included_in_feedback = false;
   bool included_in_allocation = false;
+  // Whether this packet can be part of a packet batch at lower levels.
+  bool batchable = false;
+  // Whether this packet is the last of a batch.
+  bool last_packet_in_batch = false;
 };
 
 class Transport {
diff --git a/call/call_config.h b/call/call_config.h
index 6df4ab7..918c077 100644
--- a/call/call_config.h
+++ b/call/call_config.h
@@ -81,6 +81,9 @@
 
   // The burst interval of the pacer, see TaskQueuePacedSender constructor.
   absl::optional<TimeDelta> pacer_burst_interval;
+
+  // Enables send packet batching from the egress RTP sender.
+  bool enable_send_packet_batching = false;
 };
 
 }  // namespace webrtc
diff --git a/call/rtp_config.h b/call/rtp_config.h
index 0cc9466..a01a902 100644
--- a/call/rtp_config.h
+++ b/call/rtp_config.h
@@ -159,6 +159,9 @@
   // RTCP CNAME, see RFC 3550.
   std::string c_name;
 
+  // Enables send packet batching from the egress RTP sender.
+  bool enable_send_packet_batching = false;
+
   bool IsMediaSsrc(uint32_t ssrc) const;
   bool IsRtxSsrc(uint32_t ssrc) const;
   bool IsFlexfecSsrc(uint32_t ssrc) const;
diff --git a/call/rtp_video_sender.cc b/call/rtp_video_sender.cc
index 9108e83..27040f3 100644
--- a/call/rtp_video_sender.cc
+++ b/call/rtp_video_sender.cc
@@ -235,6 +235,8 @@
   configuration.extmap_allow_mixed = rtp_config.extmap_allow_mixed;
   configuration.rtcp_report_interval_ms = rtcp_report_interval_ms;
   configuration.field_trials = &trials;
+  configuration.enable_send_packet_batching =
+      rtp_config.enable_send_packet_batching;
 
   std::vector<RtpStreamSender> rtp_streams;
 
diff --git a/media/base/media_channel_impl.cc b/media/base/media_channel_impl.cc
index ca2a117..0de04b6 100644
--- a/media/base/media_channel_impl.cc
+++ b/media/base/media_channel_impl.cc
@@ -203,6 +203,8 @@
       [this, packet_id = options.packet_id,
        included_in_feedback = options.included_in_feedback,
        included_in_allocation = options.included_in_allocation,
+       batchable = options.batchable,
+       last_packet_in_batch = options.last_packet_in_batch,
        packet = rtc::CopyOnWriteBuffer(data, len, kMaxRtpPacketLen)]() mutable {
         rtc::PacketOptions rtc_options;
         rtc_options.packet_id = packet_id;
@@ -213,6 +215,8 @@
             included_in_feedback;
         rtc_options.info_signaled_after_sent.included_in_allocation =
             included_in_allocation;
+        rtc_options.batchable = batchable;
+        rtc_options.last_packet_in_batch = last_packet_in_batch;
         SendPacket(&packet, rtc_options);
       };
 
diff --git a/media/base/media_config.h b/media/base/media_config.h
index b383c9a..7827705 100644
--- a/media/base/media_config.h
+++ b/media/base/media_config.h
@@ -62,6 +62,9 @@
 
     // Time interval between RTCP report for video
     int rtcp_report_interval_ms = 1000;
+
+    // Enables send packet batching from the egress RTP sender.
+    bool enable_send_packet_batching = false;
   } video;
 
   // Audio-specific config.
@@ -82,6 +85,8 @@
            video.experiment_cpu_load_estimator ==
                o.video.experiment_cpu_load_estimator &&
            video.rtcp_report_interval_ms == o.video.rtcp_report_interval_ms &&
+           video.enable_send_packet_batching ==
+               o.video.enable_send_packet_batching &&
            audio.rtcp_report_interval_ms == o.audio.rtcp_report_interval_ms;
   }
 
diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc
index cc1edc8..04056da 100644
--- a/media/engine/webrtc_video_engine.cc
+++ b/media/engine/webrtc_video_engine.cc
@@ -1386,6 +1386,8 @@
   config.crypto_options = crypto_options_;
   config.rtp.extmap_allow_mixed = ExtmapAllowMixed();
   config.rtcp_report_interval_ms = video_config_.rtcp_report_interval_ms;
+  config.rtp.enable_send_packet_batching =
+      video_config_.enable_send_packet_batching;
 
   WebRtcVideoSendStream* stream = new WebRtcVideoSendStream(
       call_, sp, std::move(config), default_send_options_,
diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index fb36f92..ea80c8c 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -62,6 +62,7 @@
     "../utility:utility",
   ]
   absl_deps = [
+    "//third_party/abseil-cpp/absl/cleanup",
     "//third_party/abseil-cpp/absl/memory",
     "//third_party/abseil-cpp/absl/strings",
     "//third_party/abseil-cpp/absl/types:optional",
diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc
index a526fc13..495aaeb 100644
--- a/modules/pacing/pacing_controller.cc
+++ b/modules/pacing/pacing_controller.cc
@@ -15,6 +15,7 @@
 #include <utility>
 #include <vector>
 
+#include "absl/cleanup/cleanup.h"
 #include "absl/strings/match.h"
 #include "modules/pacing/bitrate_prober.h"
 #include "modules/pacing/interval_budget.h"
@@ -374,6 +375,9 @@
 }
 
 void PacingController::ProcessPackets() {
+  absl::Cleanup cleanup = [packet_sender = packet_sender_] {
+    packet_sender->OnBatchComplete();
+  };
   const Timestamp now = CurrentTime();
   Timestamp target_send_time = now;
 
diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h
index b0d802b..2145868 100644
--- a/modules/pacing/pacing_controller.h
+++ b/modules/pacing/pacing_controller.h
@@ -52,6 +52,8 @@
     virtual std::vector<std::unique_ptr<RtpPacketToSend>> FetchFec() = 0;
     virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
         DataSize size) = 0;
+    // TODO(bugs.webrtc.org/1439830): Make pure virtual once subclasses adapt.
+    virtual void OnBatchComplete() {}
 
     // TODO(bugs.webrtc.org/11340): Make pure virtual once downstream projects
     // have been updated.
diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc
index 2c5f8e9..ade71cd 100644
--- a/modules/pacing/pacing_controller_unittest.cc
+++ b/modules/pacing/pacing_controller_unittest.cc
@@ -138,6 +138,7 @@
               GetRtxSsrcForMedia,
               (uint32_t),
               (const, override));
+  MOCK_METHOD(void, OnBatchComplete, (), (override));
 };
 
 // Mock callback implementing the raw api.
@@ -165,6 +166,7 @@
               GetRtxSsrcForMedia,
               (uint32_t),
               (const, override));
+  MOCK_METHOD(void, OnBatchComplete, (), (override));
 };
 
 class PacingControllerPadding : public PacingController::PacketSender {
@@ -202,6 +204,8 @@
     return absl::nullopt;
   }
 
+  void OnBatchComplete() override {}
+
   size_t padding_sent() { return padding_sent_; }
   size_t total_bytes_sent() { return total_bytes_sent_; }
 
@@ -260,6 +264,7 @@
   absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t) const override {
     return absl::nullopt;
   }
+  void OnBatchComplete() override {}
 
   int packets_sent() const { return packets_sent_; }
   int padding_packets_sent() const { return padding_packets_sent_; }
@@ -1454,6 +1459,13 @@
   pacer->ProcessPackets();
 }
 
+TEST_F(PacingControllerTest, ProvidesOnBatchCompleteToPacketSender) {
+  MockPacketSender callback;
+  auto pacer = std::make_unique<PacingController>(&clock_, &callback, trials_);
+  EXPECT_CALL(callback, OnBatchComplete);
+  pacer->ProcessPackets();
+}
+
 TEST_F(PacingControllerTest, ProbeClusterId) {
   MockPacketSender callback;
   uint32_t ssrc = 12346;
diff --git a/modules/pacing/packet_router.cc b/modules/pacing/packet_router.cc
index a25f692..135b618 100644
--- a/modules/pacing/packet_router.cc
+++ b/modules/pacing/packet_router.cc
@@ -90,6 +90,7 @@
   auto it = send_modules_map_.find(ssrc);
   RTC_DCHECK(it != send_modules_map_.end());
   send_modules_list_.remove(it->second);
+  RTC_CHECK(modules_used_in_current_batch_.empty());
   send_modules_map_.erase(it);
 }
 
@@ -166,6 +167,7 @@
     RTC_LOG(LS_WARNING) << "Failed to send packet, rejected by RTP module.";
     return;
   }
+  modules_used_in_current_batch_.insert(rtp_module);
 
   // Sending succeeded.
 
@@ -184,6 +186,16 @@
   }
 }
 
+void PacketRouter::OnBatchComplete() {
+  RTC_DCHECK_RUN_ON(&thread_checker_);
+  TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
+               "PacketRouter::OnBatchComplete");
+  for (auto& module : modules_used_in_current_batch_) {
+    module->OnBatchComplete();
+  }
+  modules_used_in_current_batch_.clear();
+}
+
 std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::FetchFec() {
   RTC_DCHECK_RUN_ON(&thread_checker_);
   std::vector<std::unique_ptr<RtpPacketToSend>> fec_packets =
diff --git a/modules/pacing/packet_router.h b/modules/pacing/packet_router.h
index 805f422..61779f4 100644
--- a/modules/pacing/packet_router.h
+++ b/modules/pacing/packet_router.h
@@ -16,6 +16,7 @@
 
 #include <list>
 #include <memory>
+#include <set>
 #include <unordered_map>
 #include <utility>
 #include <vector>
@@ -62,6 +63,7 @@
       uint32_t ssrc,
       rtc::ArrayView<const uint16_t> sequence_numbers) override;
   absl::optional<uint32_t> GetRtxSsrcForMedia(uint32_t ssrc) const override;
+  void OnBatchComplete() override;
 
   uint16_t CurrentTransportSequenceNumber() const;
 
@@ -108,6 +110,8 @@
 
   std::vector<std::unique_ptr<RtpPacketToSend>> pending_fec_packets_
       RTC_GUARDED_BY(thread_checker_);
+  std::set<RtpRtcpInterface*> modules_used_in_current_batch_
+      RTC_GUARDED_BY(thread_checker_);
 };
 }  // namespace webrtc
 #endif  // MODULES_PACING_PACKET_ROUTER_H_
diff --git a/modules/pacing/packet_router_unittest.cc b/modules/pacing/packet_router_unittest.cc
index 2528129..7604de6 100644
--- a/modules/pacing/packet_router_unittest.cc
+++ b/modules/pacing/packet_router_unittest.cc
@@ -252,6 +252,7 @@
 
   // If the last active module is removed, and no module sends media before
   // the next padding request, and arbitrary module will be selected.
+  packet_router_.OnBatchComplete();
   packet_router_.RemoveSendRtpModule(&rtp_2);
 
   // Send on and then remove all remaining modules.
@@ -301,6 +302,7 @@
               packet_router.CurrentTransportSequenceNumber());
   }
 
+  packet_router.OnBatchComplete();
   packet_router.RemoveSendRtpModule(&rtp_1);
 }
 
@@ -346,7 +348,7 @@
           _))
       .WillOnce(Return(true));
   packet_router_.SendPacket(std::move(packet), PacedPacketInfo());
-
+  packet_router_.OnBatchComplete();
   packet_router_.RemoveSendRtpModule(&rtp_1);
 }
 
@@ -390,6 +392,7 @@
       .WillOnce(Return(true));
   packet_router_.SendPacket(std::move(packet), PacedPacketInfo());
 
+  packet_router_.OnBatchComplete();
   packet_router_.RemoveSendRtpModule(&rtp_1);
   packet_router_.RemoveSendRtpModule(&rtp_2);
 }
@@ -430,6 +433,7 @@
       .WillOnce(Return(true));
   packet_router_.SendPacket(std::move(packet), PacedPacketInfo());
 
+  packet_router_.OnBatchComplete();
   packet_router_.RemoveSendRtpModule(&rtp);
 }
 
@@ -495,6 +499,24 @@
   packet_router_.RemoveSendRtpModule(&rtp_2);
 }
 
+TEST_F(PacketRouterTest, RoutesBatchCompleteToActiveModules) {
+  NiceMock<MockRtpRtcpInterface> rtp_1;
+  NiceMock<MockRtpRtcpInterface> rtp_2;
+  constexpr uint32_t kSsrc1 = 4711;
+  constexpr uint32_t kSsrc2 = 1234;
+  ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
+  ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2));
+  packet_router_.AddSendRtpModule(&rtp_1, false);
+  packet_router_.AddSendRtpModule(&rtp_2, false);
+  EXPECT_CALL(rtp_1, TrySendPacket).WillOnce(Return(true));
+  packet_router_.SendPacket(BuildRtpPacket(kSsrc1), PacedPacketInfo());
+  EXPECT_CALL(rtp_1, OnBatchComplete);
+  EXPECT_CALL(rtp_2, OnBatchComplete).Times(0);
+  packet_router_.OnBatchComplete();
+  packet_router_.RemoveSendRtpModule(&rtp_1);
+  packet_router_.RemoveSendRtpModule(&rtp_2);
+}
+
 #if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)
 using PacketRouterDeathTest = PacketRouterTest;
 TEST_F(PacketRouterDeathTest, DoubleRegistrationOfSendModuleDisallowed) {
diff --git a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
index 52c3fe1..e1e787a 100644
--- a/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
+++ b/modules/rtp_rtcp/mocks/mock_rtp_rtcp.h
@@ -88,6 +88,7 @@
               (std::unique_ptr<RtpPacketToSend> packet,
                const PacedPacketInfo& pacing_info),
               (override));
+  MOCK_METHOD(void, OnBatchComplete, (), (override));
   MOCK_METHOD(void,
               SetFecProtectionParams,
               (const FecProtectionParams& delta_params,
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl.h b/modules/rtp_rtcp/source/rtp_rtcp_impl.h
index f9d7c72..509036d 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl.h
@@ -135,6 +135,8 @@
   bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
                      const PacedPacketInfo& pacing_info) override;
 
+  void OnBatchComplete() override {}
+
   void SetFecProtectionParams(const FecProtectionParams& delta_params,
                               const FecProtectionParams& key_params) override;
 
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
index 79ae08d..f5627c2 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
@@ -344,6 +344,11 @@
   return true;
 }
 
+void ModuleRtpRtcpImpl2::OnBatchComplete() {
+  RTC_DCHECK(rtp_sender_);
+  rtp_sender_->packet_sender.OnBatchComplete();
+}
+
 void ModuleRtpRtcpImpl2::SetFecProtectionParams(
     const FecProtectionParams& delta_params,
     const FecProtectionParams& key_params) {
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
index 214ef04..147cd33 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
@@ -146,6 +146,7 @@
 
   bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
                      const PacedPacketInfo& pacing_info) override;
+  void OnBatchComplete() override;
 
   void SetFecProtectionParams(const FecProtectionParams& delta_params,
                               const FecProtectionParams& key_params) override;
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_interface.h b/modules/rtp_rtcp/source/rtp_rtcp_interface.h
index 92f1a5b..89c6d46 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_interface.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_interface.h
@@ -148,6 +148,9 @@
     // not negotiated. If the RID and Repaired RID extensions are not
     // registered, the RID will not be sent.
     std::string rid;
+
+    // Enables send packet batching from the egress RTP sender.
+    bool enable_send_packet_batching = false;
   };
 
   // Stats for RTCP sender reports (SR) for a specific SSRC.
@@ -317,6 +320,10 @@
   virtual bool TrySendPacket(std::unique_ptr<RtpPacketToSend> packet,
                              const PacedPacketInfo& pacing_info) = 0;
 
+  // Notifies that a batch of packet sends is completed. The implementation can
+  // use this to optimize packet sending.
+  virtual void OnBatchComplete() = 0;
+
   // Update the FEC protection parameters to use for delta- and key-frames.
   // Only used when deferred FEC is active.
   virtual void SetFecProtectionParams(
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc
index d6052f8..fdc2792 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc
@@ -66,7 +66,8 @@
 
 RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
                                  RtpPacketHistory* packet_history)
-    : worker_queue_(TaskQueueBase::Current()),
+    : enable_send_packet_batching_(config.enable_send_packet_batching),
+      worker_queue_(TaskQueueBase::Current()),
       ssrc_(config.local_media_ssrc),
       rtx_ssrc_(config.rtx_send_ssrc),
       flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
@@ -76,9 +77,7 @@
       packet_history_(packet_history),
       transport_(config.outgoing_transport),
       event_log_(config.event_log),
-#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
       is_audio_(config.audio),
-#endif
       need_rtp_packet_infos_(config.need_rtp_packet_infos),
       fec_generator_(config.fec_generator),
       transport_feedback_observer_(config.transport_feedback_callback),
@@ -139,13 +138,13 @@
     RTC_DCHECK(packet->retransmitted_sequence_number().has_value());
   }
 
-  const uint32_t packet_ssrc = packet->Ssrc();
   const Timestamp now = clock_->CurrentTime();
 
 #if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
-  worker_queue_->PostTask(SafeTask(
-      task_safety_.flag(),
-      [this, now, packet_ssrc]() { BweTestLoggingPlot(now, packet_ssrc); }));
+  worker_queue_->PostTask(SafeTask(task_safety_.flag(),
+                                   [this, now, packet_ssrc = packet->Ssrc()]() {
+                                     BweTestLoggingPlot(now, packet_ssrc);
+                                   }));
 #endif
 
   if (need_rtp_packet_infos_ &&
@@ -225,6 +224,26 @@
     }
   }
 
+  auto compound_packet = Packet{std::move(packet), pacing_info, now};
+  if (enable_send_packet_batching_ && !is_audio_) {
+    packets_to_send_.push_back(std::move(compound_packet));
+  } else {
+    CompleteSendPacket(compound_packet, false);
+  }
+}
+
+void RtpSenderEgress::OnBatchComplete() {
+  RTC_DCHECK_RUN_ON(&pacer_checker_);
+  for (auto& packet : packets_to_send_) {
+    CompleteSendPacket(packet, &packet == &packets_to_send_.back());
+  }
+  packets_to_send_.clear();
+}
+
+void RtpSenderEgress::CompleteSendPacket(const Packet& compound_packet,
+                                         bool last_in_batch) {
+  auto& [packet, pacing_info, now] = compound_packet;
+
   const bool is_media = packet->packet_type() == RtpPacketMediaType::kAudio ||
                         packet->packet_type() == RtpPacketMediaType::kVideo;
 
@@ -246,12 +265,14 @@
 
   options.additional_data = packet->additional_data();
 
+  const uint32_t packet_ssrc = packet->Ssrc();
   if (packet->packet_type() != RtpPacketMediaType::kPadding &&
       packet->packet_type() != RtpPacketMediaType::kRetransmission) {
     UpdateDelayStatistics(packet->capture_time(), now, packet_ssrc);
     UpdateOnSendPacket(options.packet_id, packet->capture_time(), packet_ssrc);
   }
-
+  options.batchable = enable_send_packet_batching_ && !is_audio_;
+  options.last_packet_in_batch = last_in_batch;
   const bool send_success = SendPacketToNetwork(*packet, options, pacing_info);
 
   // Put packet in retransmission history or update pending status even if
@@ -279,9 +300,9 @@
     // TODO(bugs.webrtc.org/137439): clean up task posting when the combined
     // network/worker project launches.
     if (TaskQueueBase::Current() != worker_queue_) {
-      worker_queue_->PostTask(
-          SafeTask(task_safety_.flag(), [this, now, packet_ssrc, packet_type,
-                                         counter = std::move(counter), size]() {
+      worker_queue_->PostTask(SafeTask(
+          task_safety_.flag(), [this, now = now, packet_ssrc, packet_type,
+                                counter = std::move(counter), size]() {
             RTC_DCHECK_RUN_ON(worker_queue_);
             UpdateRtpStats(now, packet_ssrc, packet_type, std::move(counter),
                            size);
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.h b/modules/rtp_rtcp/source/rtp_sender_egress.h
index 4b7b502..accdeb1 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress.h
+++ b/modules/rtp_rtcp/source/rtp_sender_egress.h
@@ -67,6 +67,7 @@
 
   void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
                   const PacedPacketInfo& pacing_info) RTC_LOCKS_EXCLUDED(lock_);
+  void OnBatchComplete();
   uint32_t Ssrc() const { return ssrc_; }
   absl::optional<uint32_t> RtxSsrc() const { return rtx_ssrc_; }
   absl::optional<uint32_t> FlexFecSsrc() const { return flexfec_ssrc_; }
@@ -100,6 +101,13 @@
       rtc::ArrayView<const uint16_t> sequence_numbers);
 
  private:
+  struct Packet {
+    std::unique_ptr<RtpPacketToSend> rtp_packet;
+    PacedPacketInfo info;
+    Timestamp now;
+  };
+  void CompleteSendPacket(const Packet& compound_packet, bool last_in_batch)
+      RTC_LOCKS_EXCLUDED(lock_) RTC_RUN_ON(pacer_checker_);
   RtpSendRates GetSendRatesLocked(Timestamp now) const
       RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
   bool HasCorrectSsrc(const RtpPacketToSend& packet) const;
@@ -128,6 +136,7 @@
   // Called on a timer, once a second, on the worker_queue_.
   void PeriodicUpdate();
 
+  const bool enable_send_packet_batching_;
   TaskQueueBase* const worker_queue_;
   RTC_NO_UNIQUE_ADDRESS SequenceChecker pacer_checker_;
   const uint32_t ssrc_;
@@ -138,9 +147,7 @@
   RtpPacketHistory* const packet_history_;
   Transport* const transport_;
   RtcEventLog* const event_log_;
-#if BWE_TEST_LOGGING_COMPILE_TIME_ENABLE
   const bool is_audio_;
-#endif
   const bool need_rtp_packet_infos_;
   VideoFecGenerator* const fec_generator_ RTC_GUARDED_BY(pacer_checker_);
   absl::optional<uint16_t> last_sent_seq_ RTC_GUARDED_BY(pacer_checker_);
@@ -178,6 +185,7 @@
   const std::unique_ptr<RtpSequenceNumberMap> rtp_sequence_number_map_
       RTC_GUARDED_BY(worker_queue_);
   RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_);
+  std::vector<Packet> packets_to_send_ RTC_GUARDED_BY(pacer_checker_);
   ScopedTaskSafety task_safety_;
 };
 
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc
index 504a467..75f1835 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc
@@ -35,9 +35,10 @@
 namespace {
 
 using ::testing::_;
+using ::testing::AllOf;
 using ::testing::Field;
+using ::testing::InSequence;
 using ::testing::NiceMock;
-using ::testing::Optional;
 using ::testing::StrictMock;
 
 constexpr Timestamp kStartTime = Timestamp::Millis(123456789);
@@ -96,12 +97,14 @@
  public:
   explicit TestTransport(RtpHeaderExtensionMap* extensions)
       : total_data_sent_(DataSize::Zero()), extensions_(extensions) {}
+  MOCK_METHOD(void, SentRtp, (const PacketOptions& options), ());
   bool SendRtp(const uint8_t* packet,
                size_t length,
                const PacketOptions& options) override {
     total_data_sent_ += DataSize::Bytes(length);
     last_packet_.emplace(rtc::MakeArrayView(packet, length), options,
                          extensions_);
+    SentRtp(options);
     return true;
   }
 
@@ -133,6 +136,7 @@
 
   RtpRtcpInterface::Configuration DefaultConfig() {
     RtpRtcpInterface::Configuration config;
+    config.audio = false;
     config.clock = clock_;
     config.outgoing_transport = &transport_;
     config.local_media_ssrc = kSsrc;
@@ -175,7 +179,7 @@
   NiceMock<MockSendPacketObserver> send_packet_observer_;
   NiceMock<MockTransportFeedbackObserver> feedback_observer_;
   RtpHeaderExtensionMap header_extensions_;
-  TestTransport transport_;
+  NiceMock<TestTransport> transport_;
   RtpPacketHistory packet_history_;
   test::ExplicitKeyValueConfig trials_;
   uint16_t sequence_number_;
@@ -209,6 +213,43 @@
   sender->SendPacket(std::move(packet), PacedPacketInfo());
 }
 
+TEST_F(RtpSenderEgressTest, SendsPacketsOneByOneWhenNotBatching) {
+  std::unique_ptr<RtpSenderEgress> sender = CreateRtpSenderEgress();
+  EXPECT_CALL(transport_,
+              SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false),
+                            Field(&PacketOptions::batchable, false))));
+  sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
+}
+
+TEST_F(RtpSenderEgressTest, SendsPacketsOneByOneWhenBatchingWithAudio) {
+  auto config = DefaultConfig();
+  config.enable_send_packet_batching = true;
+  config.audio = true;
+  auto sender = std::make_unique<RtpSenderEgress>(config, &packet_history_);
+  EXPECT_CALL(transport_,
+              SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false),
+                            Field(&PacketOptions::batchable, false))))
+      .Times(2);
+  sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
+  sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
+}
+
+TEST_F(RtpSenderEgressTest, CollectsPacketsWhenBatchingWithVideo) {
+  auto config = DefaultConfig();
+  config.enable_send_packet_batching = true;
+  auto sender = std::make_unique<RtpSenderEgress>(config, &packet_history_);
+  sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
+  sender->SendPacket(BuildRtpPacket(), PacedPacketInfo());
+  InSequence s;
+  EXPECT_CALL(transport_,
+              SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, false),
+                            Field(&PacketOptions::batchable, true))));
+  EXPECT_CALL(transport_,
+              SentRtp(AllOf(Field(&PacketOptions::last_packet_in_batch, true),
+                            Field(&PacketOptions::batchable, true))));
+  sender->OnBatchComplete();
+}
+
 TEST_F(RtpSenderEgressTest, PacketOptionsIsRetransmitSetByPacketType) {
   std::unique_ptr<RtpSenderEgress> sender = CreateRtpSenderEgress();
 
diff --git a/rtc_base/async_packet_socket.h b/rtc_base/async_packet_socket.h
index 90f2a13..ad9a79e 100644
--- a/rtc_base/async_packet_socket.h
+++ b/rtc_base/async_packet_socket.h
@@ -54,6 +54,12 @@
   PacketTimeUpdateParams packet_time_params;
   // PacketInfo is passed to SentPacket when signaling this packet is sent.
   PacketInfo info_signaled_after_sent;
+  // True if this is a batchable packet. Batchable packets are collected at low
+  // levels and sent first when their AsyncPacketSocket receives a
+  // OnSendBatchComplete call.
+  bool batchable = false;
+  // True if this is the last packet of a batch.
+  bool last_packet_in_batch = false;
 };
 
 // Provides the ability to receive packets asynchronously. Sends are not