ModuleRtpRtcpImpl2: remove RTCP send polling.

This change migrates RTCP send polling happening in
ModuleRtpRtcpImpl2::Process to task queues.

ModuleRtpRtcpImpl2 would previously only cause RTCP sends while being
registered with a ProcessThread. This is now relaxed so that RTCP will
be sent regardless of ProcessThread registration status, and it seems
no tests cared.

Now there's only one piece of polling left in Process.

Bug: webrtc:11581
Change-Id: Ibdcffefccef7363f2089c34a9c7d694d222445c0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/222603
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34350}
diff --git a/modules/rtp_rtcp/source/rtcp_sender.cc b/modules/rtp_rtcp/source/rtcp_sender.cc
index f531a30..c4c30a9 100644
--- a/modules/rtp_rtcp/source/rtcp_sender.cc
+++ b/modules/rtp_rtcp/source/rtcp_sender.cc
@@ -138,7 +138,7 @@
   return result;
 }
 
-RTCPSender::RTCPSender(const Configuration& config)
+RTCPSender::RTCPSender(Configuration config)
     : audio_(config.audio),
       ssrc_(config.local_media_ssrc),
       clock_(config.clock),
@@ -149,6 +149,8 @@
       report_interval_(config.rtcp_report_interval.value_or(
           TimeDelta::Millis(config.audio ? kDefaultAudioReportInterval
                                          : kDefaultVideoReportInterval))),
+      schedule_next_rtcp_send_evaluation_function_(
+          std::move(config.schedule_next_rtcp_send_evaluation_function)),
       sending_(false),
       timestamp_offset_(0),
       last_rtp_timestamp_(0),
@@ -201,7 +203,7 @@
     next_time_to_send_rtcp_ = absl::nullopt;
   } else if (method_ == RtcpMode::kOff) {
     // When switching on, reschedule the next packet
-    next_time_to_send_rtcp_ = clock_->CurrentTime() + report_interval_ / 2;
+    SetNextRtcpSendEvaluationDuration(report_interval_ / 2);
   }
   method_ = new_method;
 }
@@ -284,7 +286,7 @@
   SetFlag(kRtcpRemb, /*is_volatile=*/false);
   // Send a REMB immediately if we have a new REMB. The frequency of REMBs is
   // throttled by the caller.
-  next_time_to_send_rtcp_ = clock_->CurrentTime();
+  SetNextRtcpSendEvaluationDuration(TimeDelta::Zero());
 }
 
 void RTCPSender::UnsetRemb() {
@@ -428,7 +430,9 @@
   Timestamp now = clock_->CurrentTime();
 
   MutexLock lock(&mutex_rtcp_sender_);
-
+  RTC_DCHECK(
+      (method_ == RtcpMode::kOff && !next_time_to_send_rtcp_.has_value()) ||
+      (method_ != RtcpMode::kOff && next_time_to_send_rtcp_.has_value()));
   if (method_ == RtcpMode::kOff)
     return false;
 
@@ -807,7 +811,7 @@
         random_.Rand(min_interval_int * 1 / 2, min_interval_int * 3 / 2));
 
     RTC_DCHECK(!time_to_next.IsZero());
-    next_time_to_send_rtcp_ = clock_->CurrentTime() + time_to_next;
+    SetNextRtcpSendEvaluationDuration(time_to_next);
 
     // RtcpSender expected to be used for sending either just sender reports
     // or just receiver reports.
@@ -901,7 +905,7 @@
     RTC_LOG(LS_INFO) << "Emitting TargetBitrate XR for SSRC " << ssrc_
                      << " with new layers enabled/disabled: "
                      << video_bitrate_allocation_.ToString();
-    next_time_to_send_rtcp_ = clock_->CurrentTime();
+    SetNextRtcpSendEvaluationDuration(TimeDelta::Zero());
   } else {
     video_bitrate_allocation_ = bitrate;
   }
@@ -962,4 +966,10 @@
   sender.Send();
 }
 
+void RTCPSender::SetNextRtcpSendEvaluationDuration(TimeDelta duration) {
+  next_time_to_send_rtcp_ = clock_->CurrentTime() + duration;
+  if (schedule_next_rtcp_send_evaluation_function_)
+    schedule_next_rtcp_send_evaluation_function_(duration);
+}
+
 }  // namespace webrtc
diff --git a/modules/rtp_rtcp/source/rtcp_sender.h b/modules/rtp_rtcp/source/rtcp_sender.h
index e3fda38..e50ce44 100644
--- a/modules/rtp_rtcp/source/rtcp_sender.h
+++ b/modules/rtp_rtcp/source/rtcp_sender.h
@@ -64,6 +64,12 @@
     // Estimate RTT as non-sender as described in
     // https://tools.ietf.org/html/rfc3611#section-4.4 and #section-4.5
     bool non_sender_rtt_measurement = false;
+    // Optional callback which, if specified, is used by RTCPSender to schedule
+    // the next time to evaluate if RTCP should be sent by means of
+    // TimeToSendRTCPReport/SendRTCP.
+    // The RTCPSender client still needs to call TimeToSendRTCPReport/SendRTCP
+    // to actually get RTCP sent.
+    std::function<void(TimeDelta)> schedule_next_rtcp_send_evaluation_function;
 
     RtcEventLog* event_log = nullptr;
     absl::optional<TimeDelta> rtcp_report_interval;
@@ -91,7 +97,7 @@
     RTCPReceiver* receiver;
   };
 
-  explicit RTCPSender(const Configuration& config);
+  explicit RTCPSender(Configuration config);
   // TODO(bugs.webrtc.org/11581): delete this temporary compatibility helper
   // once downstream dependencies migrates.
   explicit RTCPSender(const RtpRtcpInterface::Configuration& config);
@@ -224,6 +230,10 @@
   void BuildNACK(const RtcpContext& context, PacketSender& sender)
       RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_rtcp_sender_);
 
+  // |duration| being TimeDelta::Zero() means schedule immediately.
+  void SetNextRtcpSendEvaluationDuration(TimeDelta duration)
+      RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_rtcp_sender_);
+
   const bool audio_;
   // TODO(bugs.webrtc.org/11581): `mutex_rtcp_sender_` shouldn't be required if
   // we consistently run network related operations on the network thread.
@@ -238,6 +248,10 @@
   Transport* const transport_;
 
   const TimeDelta report_interval_;
+  // Set from
+  // RTCPSender::Configuration::schedule_next_rtcp_send_evaluation_function.
+  const std::function<void(TimeDelta)>
+      schedule_next_rtcp_send_evaluation_function_;
 
   mutable Mutex mutex_rtcp_sender_;
   bool sending_ RTC_GUARDED_BY(mutex_rtcp_sender_);
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
index 25c92b2..7705457 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.cc
@@ -20,12 +20,16 @@
 #include <utility>
 
 #include "absl/types/optional.h"
+#include "api/sequence_checker.h"
 #include "api/transport/field_trial_based_config.h"
+#include "api/units/time_delta.h"
 #include "api/units/timestamp.h"
 #include "modules/rtp_rtcp/source/rtcp_packet/dlrr.h"
 #include "modules/rtp_rtcp/source/rtp_rtcp_config.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/logging.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+#include "rtc_base/time_utils.h"
 #include "system_wrappers/include/ntp_time.h"
 
 #ifdef _WIN32
@@ -39,6 +43,22 @@
 const int64_t kDefaultExpectedRetransmissionTimeMs = 125;
 
 constexpr TimeDelta kRttUpdateInterval = TimeDelta::Millis(1000);
+
+RTCPSender::Configuration AddRtcpSendEvaluationCallback(
+    RTCPSender::Configuration config,
+    std::function<void(TimeDelta)> send_evaluation_callback) {
+  config.schedule_next_rtcp_send_evaluation_function =
+      std::move(send_evaluation_callback);
+  return config;
+}
+
+int DelayMillisForDuration(TimeDelta duration) {
+  // TimeDelta::ms() rounds downwards sometimes which leads to too little time
+  // slept. Account for this, unless |duration| is exactly representable in
+  // millisecs.
+  return (duration.us() + rtc::kNumMillisecsPerSec - 1) /
+         rtc::kNumMicrosecsPerMillisec;
+}
 }  // namespace
 
 ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext(
@@ -57,8 +77,11 @@
 
 ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration)
     : worker_queue_(TaskQueueBase::Current()),
-      rtcp_sender_(
-          RTCPSender::Configuration::FromRtpRtcpConfiguration(configuration)),
+      rtcp_sender_(AddRtcpSendEvaluationCallback(
+          RTCPSender::Configuration::FromRtpRtcpConfiguration(configuration),
+          [this](TimeDelta duration) {
+            ScheduleRtcpSendEvaluation(duration);
+          })),
       rtcp_receiver_(configuration, this),
       clock_(configuration.clock),
       last_rtt_process_time_(clock_->TimeInMilliseconds()),
@@ -139,11 +162,6 @@
       rtcp_sender_.SetTargetBitrate(target_bitrate);
     }
   }
-
-  // TODO(bugs.webrtc.org/11581): Run this on a separate set of delayed tasks
-  // based off of next_time_to_send_rtcp_ in RTCPSender.
-  if (rtcp_sender_.TimeToSendRTCPReport())
-    rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport);
 }
 
 void ModuleRtpRtcpImpl2::SetRtxSendStatus(int mode) {
@@ -771,4 +789,60 @@
     rtcp_receiver_.NotifyTmmbrUpdated();
 }
 
+// RTC_RUN_ON(worker_queue_);
+void ModuleRtpRtcpImpl2::MaybeSendRtcp() {
+  if (rtcp_sender_.TimeToSendRTCPReport())
+    rtcp_sender_.SendRTCP(GetFeedbackState(), kRtcpReport);
+}
+
+// TODO(bugs.webrtc.org/12889): Consider removing this function when the issue
+// is resolved.
+// RTC_RUN_ON(worker_queue_);
+void ModuleRtpRtcpImpl2::MaybeSendRtcpAtOrAfterTimestamp(
+    Timestamp execution_time) {
+  Timestamp now = clock_->CurrentTime();
+  if (now >= execution_time) {
+    MaybeSendRtcp();
+    return;
+  }
+
+  RTC_DLOG(LS_WARNING)
+      << "BUGBUG: Task queue scheduled delayed call too early.";
+
+  ScheduleMaybeSendRtcpAtOrAfterTimestamp(execution_time, execution_time - now);
+}
+
+void ModuleRtpRtcpImpl2::ScheduleRtcpSendEvaluation(TimeDelta duration) {
+  // We end up here under various sequences including the worker queue, and
+  // the RTCPSender lock is held.
+  // We're assuming that the fact that RTCPSender executes under other sequences
+  // than the worker queue on which it's created on implies that external
+  // synchronization is present and removes this activity before destruction.
+  if (duration.IsZero()) {
+    worker_queue_->PostTask(ToQueuedTask(task_safety_, [this] {
+      RTC_DCHECK_RUN_ON(worker_queue_);
+      MaybeSendRtcp();
+    }));
+  } else {
+    Timestamp execution_time = clock_->CurrentTime() + duration;
+    ScheduleMaybeSendRtcpAtOrAfterTimestamp(execution_time, duration);
+  }
+}
+
+void ModuleRtpRtcpImpl2::ScheduleMaybeSendRtcpAtOrAfterTimestamp(
+    Timestamp execution_time,
+    TimeDelta duration) {
+  // We end up here under various sequences including the worker queue, and
+  // the RTCPSender lock is held.
+  // See note in ScheduleRtcpSendEvaluation about why |worker_queue_| can be
+  // accessed.
+  worker_queue_->PostDelayedTask(
+      ToQueuedTask(task_safety_,
+                   [this, execution_time] {
+                     RTC_DCHECK_RUN_ON(worker_queue_);
+                     MaybeSendRtcpAtOrAfterTimestamp(execution_time);
+                   }),
+      DelayMillisForDuration(duration));
+}
+
 }  // namespace webrtc
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
index 4c38517..849cc42 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2.h
@@ -23,6 +23,7 @@
 #include "api/rtp_headers.h"
 #include "api/sequence_checker.h"
 #include "api/task_queue/task_queue_base.h"
+#include "api/units/time_delta.h"
 #include "api/video/video_bitrate_allocation.h"
 #include "modules/include/module_fec_types.h"
 #include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
@@ -32,7 +33,6 @@
 #include "modules/rtp_rtcp/source/rtcp_sender.h"
 #include "modules/rtp_rtcp/source/rtp_packet_history.h"
 #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
-#include "modules/rtp_rtcp/source/rtp_rtcp_impl2.h"
 #include "modules/rtp_rtcp/source/rtp_sender.h"
 #include "modules/rtp_rtcp/source/rtp_sender_egress.h"
 #include "rtc_base/gtest_prod_util.h"
@@ -40,6 +40,8 @@
 #include "rtc_base/system/no_unique_address.h"
 #include "rtc_base/task_utils/pending_task_safety_flag.h"
 #include "rtc_base/task_utils/repeating_task.h"
+#include "rtc_base/task_utils/to_queued_task.h"
+#include "rtc_base/thread_annotations.h"
 
 namespace webrtc {
 
@@ -200,7 +202,8 @@
   int64_t ExpectedRetransmissionTimeMs() const override;
 
   // Force a send of an RTCP packet.
-  // Normal SR and RR are triggered via the process function.
+  // Normal SR and RR are triggered via the task queue that's current when this
+  // object is created.
   int32_t SendRTCP(RTCPPacketType rtcpPacketType) override;
 
   void GetSendStreamDataCounters(
@@ -289,12 +292,28 @@
   // Returns true if the module is configured to store packets.
   bool StorePackets() const;
 
+  // Used from RtcpSenderMediator to maybe send rtcp.
+  void MaybeSendRtcp() RTC_RUN_ON(worker_queue_);
+
+  // Called when |rtcp_sender_| informs of the next RTCP instant. The method may
+  // be called on various sequences, and is called under a RTCPSenderLock.
+  void ScheduleRtcpSendEvaluation(TimeDelta duration);
+
+  // Helper method combating too early delayed calls from task queues.
+  // TODO(bugs.webrtc.org/12889): Consider removing this function when the issue
+  // is resolved.
+  void MaybeSendRtcpAtOrAfterTimestamp(Timestamp execution_time)
+      RTC_RUN_ON(worker_queue_);
+
+  // Schedules a call to MaybeSendRtcpAtOrAfterTimestamp delayed by |duration|.
+  void ScheduleMaybeSendRtcpAtOrAfterTimestamp(Timestamp execution_time,
+                                               TimeDelta duration);
+
   TaskQueueBase* const worker_queue_;
   RTC_NO_UNIQUE_ADDRESS SequenceChecker process_thread_checker_;
   RTC_NO_UNIQUE_ADDRESS SequenceChecker packet_sequence_checker_;
 
   std::unique_ptr<RtpSenderContext> rtp_sender_;
-
   RTCPSender rtcp_sender_;
   RTCPReceiver rtcp_receiver_;
 
@@ -316,6 +335,8 @@
   // The processed RTT from RtcpRttStats.
   mutable Mutex mutex_rtt_;
   int64_t rtt_ms_ RTC_GUARDED_BY(mutex_rtt_);
+
+  RTC_NO_UNIQUE_ADDRESS ScopedTaskSafety task_safety_;
 };
 
 }  // namespace webrtc
diff --git a/video/end_to_end_tests/network_state_tests.cc b/video/end_to_end_tests/network_state_tests.cc
index 9abde3b..4e0e86f 100644
--- a/video/end_to_end_tests/network_state_tests.cc
+++ b/video/end_to_end_tests/network_state_tests.cc
@@ -10,13 +10,19 @@
 
 #include <memory>
 
+#include "api/media_types.h"
+#include "api/task_queue/default_task_queue_factory.h"
+#include "api/task_queue/task_queue_base.h"
+#include "api/task_queue/task_queue_factory.h"
 #include "api/test/simulated_network.h"
 #include "api/video_codecs/video_encoder.h"
 #include "call/fake_network_pipe.h"
 #include "call/simulated_network.h"
 #include "modules/rtp_rtcp/source/rtp_packet.h"
+#include "rtc_base/location.h"
 #include "rtc_base/synchronization/mutex.h"
 #include "rtc_base/task_queue_for_test.h"
+#include "rtc_base/task_utils/to_queued_task.h"
 #include "system_wrappers/include/sleep.h"
 #include "test/call_test.h"
 #include "test/fake_encoder.h"
@@ -166,7 +172,10 @@
     explicit NetworkStateTest(TaskQueueBase* task_queue)
         : EndToEndTest(kDefaultTimeoutMs),
           FakeEncoder(Clock::GetRealTimeClock()),
-          task_queue_(task_queue),
+          e2e_test_task_queue_(task_queue),
+          task_queue_(CreateDefaultTaskQueueFactory()->CreateTaskQueue(
+              "NetworkStateTest",
+              TaskQueueFactory::Priority::NORMAL)),
           sender_call_(nullptr),
           receiver_call_(nullptr),
           encoder_factory_(this),
@@ -219,26 +228,36 @@
       send_config->encoder_settings.encoder_factory = &encoder_factory_;
     }
 
+    void SignalChannelNetworkState(Call* call,
+                                   MediaType media_type,
+                                   NetworkState network_state) {
+      SendTask(RTC_FROM_HERE, e2e_test_task_queue_,
+               [call, media_type, network_state] {
+                 call->SignalChannelNetworkState(media_type, network_state);
+               });
+    }
+
     void PerformTest() override {
       EXPECT_TRUE(encoded_frames_.Wait(kDefaultTimeoutMs))
           << "No frames received by the encoder.";
 
-      SendTask(RTC_FROM_HERE, task_queue_, [this]() {
+      SendTask(RTC_FROM_HERE, task_queue_.get(), [this]() {
         // Wait for packets from both sender/receiver.
         WaitForPacketsOrSilence(false, false);
 
         // Sender-side network down for audio; there should be no effect on
         // video
-        sender_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkDown);
+        SignalChannelNetworkState(sender_call_, MediaType::AUDIO, kNetworkDown);
+
         WaitForPacketsOrSilence(false, false);
 
         // Receiver-side network down for audio; no change expected
-        receiver_call_->SignalChannelNetworkState(MediaType::AUDIO,
-                                                  kNetworkDown);
+        SignalChannelNetworkState(receiver_call_, MediaType::AUDIO,
+                                  kNetworkDown);
         WaitForPacketsOrSilence(false, false);
 
         // Sender-side network down.
-        sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkDown);
+        SignalChannelNetworkState(sender_call_, MediaType::VIDEO, kNetworkDown);
         {
           MutexLock lock(&test_mutex_);
           // After network goes down we shouldn't be encoding more frames.
@@ -248,14 +267,14 @@
         WaitForPacketsOrSilence(true, false);
 
         // Receiver-side network down.
-        receiver_call_->SignalChannelNetworkState(MediaType::VIDEO,
-                                                  kNetworkDown);
+        SignalChannelNetworkState(receiver_call_, MediaType::VIDEO,
+                                  kNetworkDown);
         WaitForPacketsOrSilence(true, true);
 
         // Network up for audio for both sides; video is still not expected to
         // start
-        sender_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
-        receiver_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
+        SignalChannelNetworkState(sender_call_, MediaType::AUDIO, kNetworkUp);
+        SignalChannelNetworkState(receiver_call_, MediaType::AUDIO, kNetworkUp);
         WaitForPacketsOrSilence(true, true);
 
         // Network back up again for both.
@@ -265,8 +284,8 @@
           // network.
           sender_state_ = kNetworkUp;
         }
-        sender_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
-        receiver_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
+        SignalChannelNetworkState(sender_call_, MediaType::VIDEO, kNetworkUp);
+        SignalChannelNetworkState(receiver_call_, MediaType::VIDEO, kNetworkUp);
         WaitForPacketsOrSilence(false, false);
 
         // TODO(skvlad): add tests to verify that the audio streams are stopped
@@ -340,7 +359,8 @@
       }
     }
 
-    TaskQueueBase* const task_queue_;
+    TaskQueueBase* const e2e_test_task_queue_;
+    std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
     Mutex test_mutex_;
     rtc::Event encoded_frames_;
     rtc::Event packet_event_;
diff --git a/video/rtp_video_stream_receiver2_unittest.cc b/video/rtp_video_stream_receiver2_unittest.cc
index d22f12e..d23b604 100644
--- a/video/rtp_video_stream_receiver2_unittest.cc
+++ b/video/rtp_video_stream_receiver2_unittest.cc
@@ -13,6 +13,7 @@
 #include <memory>
 #include <utility>
 
+#include "api/task_queue/task_queue_base.h"
 #include "api/video/video_codec_type.h"
 #include "api/video/video_frame_type.h"
 #include "common_video/h264/h264_common.h"
@@ -38,6 +39,7 @@
 #include "test/gtest.h"
 #include "test/mock_frame_transformer.h"
 #include "test/time_controller/simulated_task_queue.h"
+#include "test/time_controller/simulated_time_controller.h"
 
 using ::testing::_;
 using ::testing::ElementsAre;
@@ -158,7 +160,12 @@
  public:
   RtpVideoStreamReceiver2Test() : RtpVideoStreamReceiver2Test("") {}
   explicit RtpVideoStreamReceiver2Test(std::string field_trials)
-      : override_field_trials_(field_trials),
+      : time_controller_(Timestamp::Millis(100)),
+        task_queue_(time_controller_.GetTaskQueueFactory()->CreateTaskQueue(
+            "RtpVideoStreamReceiver2Test",
+            TaskQueueFactory::Priority::NORMAL)),
+        task_queue_setter_(task_queue_.get()),
+        override_field_trials_(field_trials),
         config_(CreateConfig()),
         process_thread_(ProcessThread::Create("TestThread")) {
     rtp_receive_statistics_ =
@@ -233,8 +240,9 @@
     return config;
   }
 
-  TokenTaskQueue task_queue_;
-  TokenTaskQueue::CurrentTaskQueueSetter task_queue_setter_{&task_queue_};
+  GlobalSimulatedTimeController time_controller_;
+  std::unique_ptr<TaskQueueBase, TaskQueueDeleter> task_queue_;
+  TokenTaskQueue::CurrentTaskQueueSetter task_queue_setter_;
 
   const webrtc::test::ScopedFieldTrials override_field_trials_;
   VideoReceiveStream::Config config_;