RtpTransportControllerSend::ProcessSentPacket: remove PostTask.
This CL removes a PostTask in response to packet receipt reception.
This is made possible due to PacketRouter lock removal in
https://webrtc-review.googlesource.com/c/src/+/300964.
Depending on how transport code is organized, this may lead to
possibility of packet receipts arriving in
RtpTransportControllerSend which may re-enter the PacingController's
ProcessPackets method, leading to out-of-order packet sends. Fix
this by detecting re-entry and avoiding a second ProcessPackets call
in the TaskQueuePacedSender.
Bug: chromium:1373439
Change-Id: I24928f2d28a240d0860fe7e4a114cedf1f13d2bd
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/304580
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40017}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index 5f30390..9e8117f 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -372,25 +372,24 @@
}
void RtpTransportControllerSend::OnSentPacket(
const rtc::SentPacket& sent_packet) {
- // Normally called on the network thread !
- // TODO(bugs.webrtc.org/137439): Clarify other thread contexts calling in, and
- // simplify task posting logic when the combined network/worker project
+ // Normally called on the network thread!
+ // TODO(crbug.com/1373439): Clarify other thread contexts calling in,
+ // and simplify task posting logic when the combined network/worker project
// launches.
if (TaskQueueBase::Current() != task_queue_) {
task_queue_->PostTask(SafeTask(safety_.flag(), [this, sent_packet]() {
RTC_DCHECK_RUN_ON(&sequence_checker_);
- ProcessSentPacket(sent_packet, /*posted_to_worker=*/true);
+ ProcessSentPacket(sent_packet);
}));
return;
}
RTC_DCHECK_RUN_ON(&sequence_checker_);
- ProcessSentPacket(sent_packet, /*posted_to_worker=*/false);
+ ProcessSentPacket(sent_packet);
}
void RtpTransportControllerSend::ProcessSentPacket(
- const rtc::SentPacket& sent_packet,
- bool posted_to_worker) {
+ const rtc::SentPacket& sent_packet) {
RTC_DCHECK_RUN_ON(&sequence_checker_);
absl::optional<SentPacket> packet_msg =
transport_feedback_adapter_.ProcessSentPacket(sent_packet);
@@ -403,24 +402,7 @@
control_update = controller_->OnSentPacket(*packet_msg);
if (!congestion_update && !control_update.has_updates())
return;
- if (posted_to_worker) {
- ProcessSentPacketUpdates(std::move(control_update));
- } else {
- // TODO(bugs.webrtc.org/137439): Aim to remove downstream locks to permit
- // removing this PostTask.
- // At least in test situations (and possibly in production environments), we
- // may get here synchronously with locks taken in PacketRouter::SendPacket.
- // Because the pacer may at times synchronously re-enter
- // PacketRouter::SendPacket, we need to break the chain here and PostTask to
- // get out of the lock. In testing, having updates to process happens pretty
- // rarely so we do not usually get here.
- task_queue_->PostTask(
- SafeTask(safety_.flag(),
- [this, control_update = std::move(control_update)]() mutable {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
- ProcessSentPacketUpdates(std::move(control_update));
- }));
- }
+ ProcessSentPacketUpdates(std::move(control_update));
}
// RTC_RUN_ON(task_queue_)
diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h
index 02a7f52..b5134a5 100644
--- a/call/rtp_transport_controller_send.h
+++ b/call/rtp_transport_controller_send.h
@@ -141,8 +141,8 @@
void UpdateCongestedState() RTC_RUN_ON(sequence_checker_);
absl::optional<bool> GetCongestedStateUpdate() const
RTC_RUN_ON(sequence_checker_);
- void ProcessSentPacket(const rtc::SentPacket& sent_packet,
- bool posted_to_worker) RTC_RUN_ON(sequence_checker_);
+ void ProcessSentPacket(const rtc::SentPacket& sent_packet)
+ RTC_RUN_ON(sequence_checker_);
void ProcessSentPacketUpdates(NetworkControlUpdate updates)
RTC_RUN_ON(sequence_checker_);
diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc
index 04056da..5e6e303 100644
--- a/media/engine/webrtc_video_engine.cc
+++ b/media/engine/webrtc_video_engine.cc
@@ -1774,7 +1774,7 @@
// depending on configuration set at object initialization.
RTC_DCHECK_RUN_ON(&network_thread_checker_);
- // TODO(bugs.webrtc.org/137439): Stop posting to the worker thread when the
+ // TODO(crbug.com/1373439): Stop posting to the worker thread when the
// combined network/worker project launches.
if (webrtc::TaskQueueBase::Current() != worker_thread_) {
worker_thread_->PostTask(
diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc
index b5dfdd4..afa36ea 100644
--- a/modules/pacing/task_queue_paced_sender.cc
+++ b/modules/pacing/task_queue_paced_sender.cc
@@ -13,6 +13,7 @@
#include <algorithm>
#include <utility>
+#include "absl/cleanup/cleanup.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/transport/network_types.h"
#include "rtc_base/checks.h"
@@ -83,7 +84,7 @@
std::vector<ProbeClusterConfig> probe_cluster_configs) {
RTC_DCHECK_RUN_ON(task_queue_);
pacing_controller_.CreateProbeClusters(probe_cluster_configs);
- MaybeProcessPackets(Timestamp::MinusInfinity());
+ MaybeScheduleProcessPackets();
}
void TaskQueuePacedSender::Pause() {
@@ -100,14 +101,14 @@
void TaskQueuePacedSender::SetCongested(bool congested) {
RTC_DCHECK_RUN_ON(task_queue_);
pacing_controller_.SetCongested(congested);
- MaybeProcessPackets(Timestamp::MinusInfinity());
+ MaybeScheduleProcessPackets();
}
void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
DataRate padding_rate) {
RTC_DCHECK_RUN_ON(task_queue_);
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
- MaybeProcessPackets(Timestamp::MinusInfinity());
+ MaybeScheduleProcessPackets();
}
void TaskQueuePacedSender::EnqueuePackets(
@@ -200,6 +201,12 @@
current_stats_ = stats;
}
+// RTC_RUN_ON(task_queue_)
+void TaskQueuePacedSender::MaybeScheduleProcessPackets() {
+ if (!processing_packets_)
+ MaybeProcessPackets(Timestamp::MinusInfinity());
+}
+
void TaskQueuePacedSender::MaybeProcessPackets(
Timestamp scheduled_process_time) {
RTC_DCHECK_RUN_ON(task_queue_);
@@ -211,6 +218,15 @@
return;
}
+ // Protects against re-entry from transport feedback calling into the task
+ // queue pacer.
+ RTC_DCHECK(!processing_packets_);
+ processing_packets_ = true;
+ absl::Cleanup cleanup = [this] {
+ RTC_DCHECK_RUN_ON(task_queue_);
+ processing_packets_ = false;
+ };
+
Timestamp next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
const Timestamp now = clock_->CurrentTime();
diff --git a/modules/pacing/task_queue_paced_sender.h b/modules/pacing/task_queue_paced_sender.h
index 7bb0057..fd71be1 100644
--- a/modules/pacing/task_queue_paced_sender.h
+++ b/modules/pacing/task_queue_paced_sender.h
@@ -131,9 +131,12 @@
void OnStatsUpdated(const Stats& stats);
private:
+ // Call in response to state updates that could warrant sending out packets.
+ // Protected against re-entry from packet sent receipts.
+ void MaybeScheduleProcessPackets() RTC_RUN_ON(task_queue_);
// Check if it is time to send packets, or schedule a delayed task if not.
// Use Timestamp::MinusInfinity() to indicate that this call has _not_
- // been scheduled by the pacing controller. If this is the case, check if
+ // been scheduled by the pacing controller. If this is the case, check if we
// can execute immediately otherwise schedule a delay task that calls this
// method again with desired (finite) scheduled process time.
void MaybeProcessPackets(Timestamp scheduled_process_time);
@@ -180,6 +183,8 @@
bool include_overhead_ RTC_GUARDED_BY(task_queue_);
Stats current_stats_ RTC_GUARDED_BY(task_queue_);
+ // Protects against ProcessPackets reentry from packet sent receipts.
+ bool processing_packets_ RTC_GUARDED_BY(task_queue_) = false;
ScopedTaskSafety safety_;
TaskQueueBase* task_queue_;
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc
index fdc2792..22da99b 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc
@@ -297,7 +297,7 @@
RtpPacketMediaType packet_type = *packet->packet_type();
RtpPacketCounter counter(*packet);
size_t size = packet->size();
- // TODO(bugs.webrtc.org/137439): clean up task posting when the combined
+ // TODO(crbug.com/1373439): clean up task posting when the combined
// network/worker project launches.
if (TaskQueueBase::Current() != worker_queue_) {
worker_queue_->PostTask(SafeTask(