RtpTransportControllerSend::ProcessSentPacket: remove PostTask.

This CL removes a PostTask in response to packet receipt reception.
This is made possible due to PacketRouter lock removal in
https://webrtc-review.googlesource.com/c/src/+/300964.

Depending on how transport code is organized, this may lead to
possibility of packet receipts arriving in
RtpTransportControllerSend which may re-enter the PacingController's
ProcessPackets method, leading to out-of-order packet sends. Fix
this by detecting re-entry and avoiding a second ProcessPackets call
in the TaskQueuePacedSender.

Bug: chromium:1373439
Change-Id: I24928f2d28a240d0860fe7e4a114cedf1f13d2bd
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/304580
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40017}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index 5f30390..9e8117f 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -372,25 +372,24 @@
 }
 void RtpTransportControllerSend::OnSentPacket(
     const rtc::SentPacket& sent_packet) {
-  // Normally called on the network thread !
-  // TODO(bugs.webrtc.org/137439): Clarify other thread contexts calling in, and
-  // simplify task posting logic when the combined network/worker project
+  // Normally called on the network thread!
+  // TODO(crbug.com/1373439): Clarify other thread contexts calling in,
+  // and simplify task posting logic when the combined network/worker project
   // launches.
   if (TaskQueueBase::Current() != task_queue_) {
     task_queue_->PostTask(SafeTask(safety_.flag(), [this, sent_packet]() {
       RTC_DCHECK_RUN_ON(&sequence_checker_);
-      ProcessSentPacket(sent_packet, /*posted_to_worker=*/true);
+      ProcessSentPacket(sent_packet);
     }));
     return;
   }
 
   RTC_DCHECK_RUN_ON(&sequence_checker_);
-  ProcessSentPacket(sent_packet, /*posted_to_worker=*/false);
+  ProcessSentPacket(sent_packet);
 }
 
 void RtpTransportControllerSend::ProcessSentPacket(
-    const rtc::SentPacket& sent_packet,
-    bool posted_to_worker) {
+    const rtc::SentPacket& sent_packet) {
   RTC_DCHECK_RUN_ON(&sequence_checker_);
   absl::optional<SentPacket> packet_msg =
       transport_feedback_adapter_.ProcessSentPacket(sent_packet);
@@ -403,24 +402,7 @@
     control_update = controller_->OnSentPacket(*packet_msg);
   if (!congestion_update && !control_update.has_updates())
     return;
-  if (posted_to_worker) {
-    ProcessSentPacketUpdates(std::move(control_update));
-  } else {
-    // TODO(bugs.webrtc.org/137439): Aim to remove downstream locks to permit
-    // removing this PostTask.
-    // At least in test situations (and possibly in production environments), we
-    // may get here synchronously with locks taken in PacketRouter::SendPacket.
-    // Because the pacer may at times synchronously re-enter
-    // PacketRouter::SendPacket, we need to break the chain here and PostTask to
-    // get out of the lock. In testing, having updates to process happens pretty
-    // rarely so we do not usually get here.
-    task_queue_->PostTask(
-        SafeTask(safety_.flag(),
-                 [this, control_update = std::move(control_update)]() mutable {
-                   RTC_DCHECK_RUN_ON(&sequence_checker_);
-                   ProcessSentPacketUpdates(std::move(control_update));
-                 }));
-  }
+  ProcessSentPacketUpdates(std::move(control_update));
 }
 
 // RTC_RUN_ON(task_queue_)
diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h
index 02a7f52..b5134a5 100644
--- a/call/rtp_transport_controller_send.h
+++ b/call/rtp_transport_controller_send.h
@@ -141,8 +141,8 @@
   void UpdateCongestedState() RTC_RUN_ON(sequence_checker_);
   absl::optional<bool> GetCongestedStateUpdate() const
       RTC_RUN_ON(sequence_checker_);
-  void ProcessSentPacket(const rtc::SentPacket& sent_packet,
-                         bool posted_to_worker) RTC_RUN_ON(sequence_checker_);
+  void ProcessSentPacket(const rtc::SentPacket& sent_packet)
+      RTC_RUN_ON(sequence_checker_);
   void ProcessSentPacketUpdates(NetworkControlUpdate updates)
       RTC_RUN_ON(sequence_checker_);
 
diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc
index 04056da..5e6e303 100644
--- a/media/engine/webrtc_video_engine.cc
+++ b/media/engine/webrtc_video_engine.cc
@@ -1774,7 +1774,7 @@
   // depending on configuration set at object initialization.
   RTC_DCHECK_RUN_ON(&network_thread_checker_);
 
-  // TODO(bugs.webrtc.org/137439): Stop posting to the worker thread when the
+  // TODO(crbug.com/1373439): Stop posting to the worker thread when the
   // combined network/worker project launches.
   if (webrtc::TaskQueueBase::Current() != worker_thread_) {
     worker_thread_->PostTask(
diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc
index b5dfdd4..afa36ea 100644
--- a/modules/pacing/task_queue_paced_sender.cc
+++ b/modules/pacing/task_queue_paced_sender.cc
@@ -13,6 +13,7 @@
 #include <algorithm>
 #include <utility>
 
+#include "absl/cleanup/cleanup.h"
 #include "api/task_queue/pending_task_safety_flag.h"
 #include "api/transport/network_types.h"
 #include "rtc_base/checks.h"
@@ -83,7 +84,7 @@
     std::vector<ProbeClusterConfig> probe_cluster_configs) {
   RTC_DCHECK_RUN_ON(task_queue_);
   pacing_controller_.CreateProbeClusters(probe_cluster_configs);
-  MaybeProcessPackets(Timestamp::MinusInfinity());
+  MaybeScheduleProcessPackets();
 }
 
 void TaskQueuePacedSender::Pause() {
@@ -100,14 +101,14 @@
 void TaskQueuePacedSender::SetCongested(bool congested) {
   RTC_DCHECK_RUN_ON(task_queue_);
   pacing_controller_.SetCongested(congested);
-  MaybeProcessPackets(Timestamp::MinusInfinity());
+  MaybeScheduleProcessPackets();
 }
 
 void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
                                           DataRate padding_rate) {
   RTC_DCHECK_RUN_ON(task_queue_);
   pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
-  MaybeProcessPackets(Timestamp::MinusInfinity());
+  MaybeScheduleProcessPackets();
 }
 
 void TaskQueuePacedSender::EnqueuePackets(
@@ -200,6 +201,12 @@
   current_stats_ = stats;
 }
 
+// RTC_RUN_ON(task_queue_)
+void TaskQueuePacedSender::MaybeScheduleProcessPackets() {
+  if (!processing_packets_)
+    MaybeProcessPackets(Timestamp::MinusInfinity());
+}
+
 void TaskQueuePacedSender::MaybeProcessPackets(
     Timestamp scheduled_process_time) {
   RTC_DCHECK_RUN_ON(task_queue_);
@@ -211,6 +218,15 @@
     return;
   }
 
+  // Protects against re-entry from transport feedback calling into the task
+  // queue pacer.
+  RTC_DCHECK(!processing_packets_);
+  processing_packets_ = true;
+  absl::Cleanup cleanup = [this] {
+    RTC_DCHECK_RUN_ON(task_queue_);
+    processing_packets_ = false;
+  };
+
   Timestamp next_send_time = pacing_controller_.NextSendTime();
   RTC_DCHECK(next_send_time.IsFinite());
   const Timestamp now = clock_->CurrentTime();
diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h
index 7bb0057..fd71be1 100644
--- a/modules/pacing/task_queue_paced_sender.h
+++ b/modules/pacing/task_queue_paced_sender.h
@@ -131,9 +131,12 @@
   void OnStatsUpdated(const Stats& stats);
 
  private:
+  // Call in response to state updates that could warrant sending out packets.
+  // Protected against re-entry from packet sent receipts.
+  void MaybeScheduleProcessPackets() RTC_RUN_ON(task_queue_);
   // Check if it is time to send packets, or schedule a delayed task if not.
   // Use Timestamp::MinusInfinity() to indicate that this call has _not_
-  // been scheduled by the pacing controller. If this is the case, check if
+  // been scheduled by the pacing controller. If this is the case, check if we
   // can execute immediately otherwise schedule a delay task that calls this
   // method again with desired (finite) scheduled process time.
   void MaybeProcessPackets(Timestamp scheduled_process_time);
@@ -180,6 +183,8 @@
   bool include_overhead_ RTC_GUARDED_BY(task_queue_);
 
   Stats current_stats_ RTC_GUARDED_BY(task_queue_);
+  // Protects against ProcessPackets reentry from packet sent receipts.
+  bool processing_packets_ RTC_GUARDED_BY(task_queue_) = false;
 
   ScopedTaskSafety safety_;
   TaskQueueBase* task_queue_;
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc
index fdc2792..22da99b 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc
@@ -297,7 +297,7 @@
     RtpPacketMediaType packet_type = *packet->packet_type();
     RtpPacketCounter counter(*packet);
     size_t size = packet->size();
-    // TODO(bugs.webrtc.org/137439): clean up task posting when the combined
+    // TODO(crbug.com/1373439): clean up task posting when the combined
     // network/worker project launches.
     if (TaskQueueBase::Current() != worker_queue_) {
       worker_queue_->PostTask(SafeTask(