Move receive side congestion controller periodic task to worker thread

This way call no longer needs dedicated process thread

Bug: webrtc:7219
Change-Id: I8ab677b1e6b909eeb726aefed5e6d10ce4bc43b7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265921
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37279}
diff --git a/call/BUILD.gn b/call/BUILD.gn
index 1f5e707..7e8eb65 100644
--- a/call/BUILD.gn
+++ b/call/BUILD.gn
@@ -332,6 +332,7 @@
     "../rtc_base/experiments:field_trial_parser",
     "../rtc_base/network:sent_packet",
     "../rtc_base/system:no_unique_address",
+    "../rtc_base/task_utils:repeating_task",
     "../system_wrappers",
     "../system_wrappers:field_trial",
     "../system_wrappers:metrics",
diff --git a/call/call.cc b/call/call.cc
index d84964d..b86cc10 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -57,6 +57,7 @@
 #include "rtc_base/logging.h"
 #include "rtc_base/strings/string_builder.h"
 #include "rtc_base/system/no_unique_address.h"
+#include "rtc_base/task_utils/repeating_task.h"
 #include "rtc_base/thread_annotations.h"
 #include "rtc_base/time_utils.h"
 #include "rtc_base/trace_event.h"
@@ -203,7 +204,6 @@
   Call(Clock* clock,
        const Call::Config& config,
        std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
-       rtc::scoped_refptr<SharedModuleThread> module_process_thread,
        TaskQueueFactory* task_queue_factory);
   ~Call() override;
 
@@ -378,7 +378,6 @@
   RTC_NO_UNIQUE_ADDRESS SequenceChecker send_transport_sequence_checker_;
 
   const int num_cpu_cores_;
-  const rtc::scoped_refptr<SharedModuleThread> module_process_thread_;
   const std::unique_ptr<CallStats> call_stats_;
   const std::unique_ptr<BitrateAllocator> bitrate_allocator_;
   const Call::Config config_ RTC_GUARDED_BY(worker_thread_);
@@ -456,6 +455,7 @@
   std::atomic<uint32_t> configured_max_padding_bitrate_bps_{0};
 
   ReceiveSideCongestionController receive_side_cc_;
+  RepeatingTaskHandle receive_side_cc_periodic_task_;
 
   const std::unique_ptr<ReceiveTimeCalculator> receive_time_calculator_;
 
@@ -513,7 +513,7 @@
 
 Call* Call::Create(const Call::Config& config,
                    Clock* clock,
-                   rtc::scoped_refptr<SharedModuleThread> call_thread,
+                   rtc::scoped_refptr<SharedModuleThread> /*call_thread*/,
                    std::unique_ptr<ProcessThread> pacer_thread) {
   RTC_DCHECK(config.task_queue_factory);
 
@@ -524,17 +524,17 @@
   return new internal::Call(
       clock, config,
       transport_controller_factory_.Create(transportConfig, clock),
-      std::move(call_thread), config.task_queue_factory);
+      config.task_queue_factory);
 }
 
 Call* Call::Create(const Call::Config& config,
                    Clock* clock,
-                   rtc::scoped_refptr<SharedModuleThread> call_thread,
+                   rtc::scoped_refptr<SharedModuleThread> /*call_thread*/,
                    std::unique_ptr<RtpTransportControllerSendInterface>
                        transportControllerSend) {
   RTC_DCHECK(config.task_queue_factory);
   return new internal::Call(clock, config, std::move(transportControllerSend),
-                            std::move(call_thread), config.task_queue_factory);
+                            config.task_queue_factory);
 }
 
 class SharedModuleThread::Impl {
@@ -796,7 +796,6 @@
 Call::Call(Clock* clock,
            const Call::Config& config,
            std::unique_ptr<RtpTransportControllerSendInterface> transport_send,
-           rtc::scoped_refptr<SharedModuleThread> module_process_thread,
            TaskQueueFactory* task_queue_factory)
     : clock_(clock),
       task_queue_factory_(task_queue_factory),
@@ -811,7 +810,6 @@
                                                               worker_thread_)
                        : nullptr),
       num_cpu_cores_(CpuInfo::DetectNumberOfCores()),
-      module_process_thread_(std::move(module_process_thread)),
       call_stats_(new CallStats(clock_, worker_thread_)),
       bitrate_allocator_(new BitrateAllocator(this)),
       config_(config),
@@ -849,10 +847,11 @@
 
   call_stats_->RegisterStatsObserver(&receive_side_cc_);
 
-  module_process_thread_->process_thread()->RegisterModule(
-      receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
-  module_process_thread_->process_thread()->RegisterModule(&receive_side_cc_,
-                                                           RTC_FROM_HERE);
+  ReceiveSideCongestionController* receive_side_cc = &receive_side_cc_;
+  receive_side_cc_periodic_task_ = RepeatingTaskHandle::Start(
+      worker_thread_,
+      [receive_side_cc] { return receive_side_cc->MaybeProcess(); },
+      TaskQueueBase::DelayPrecision::kLow, clock_);
 }
 
 Call::~Call() {
@@ -864,9 +863,7 @@
   RTC_CHECK(audio_receive_streams_.empty());
   RTC_CHECK(video_receive_streams_.empty());
 
-  module_process_thread_->process_thread()->DeRegisterModule(
-      receive_side_cc_.GetRemoteBitrateEstimator(true));
-  module_process_thread_->process_thread()->DeRegisterModule(&receive_side_cc_);
+  receive_side_cc_periodic_task_.Stop();
   call_stats_->DeregisterStatsObserver(&receive_side_cc_);
   send_stats_.SetFirstPacketTime(transport_send_->GetFirstPacketTime());
 
@@ -887,7 +884,6 @@
   // off being kicked off on request rather than in the ctor.
   transport_send_->RegisterTargetTransferRateObserver(this);
 
-  module_process_thread_->EnsureStarted();
   transport_send_->EnsureStarted();
 }
 
diff --git a/modules/congestion_controller/include/receive_side_congestion_controller.h b/modules/congestion_controller/include/receive_side_congestion_controller.h
index 1dcb468..53a0de2 100644
--- a/modules/congestion_controller/include/receive_side_congestion_controller.h
+++ b/modules/congestion_controller/include/receive_side_congestion_controller.h
@@ -32,8 +32,7 @@
 // relaying for each received RTP packet back to the sender. While for
 // receive side bandwidth estimation, we do the estimation locally and
 // send our results back to the sender.
-class ReceiveSideCongestionController : public CallStatsObserver,
-                                        public Module {
+class ReceiveSideCongestionController : public CallStatsObserver {
  public:
   ReceiveSideCongestionController(
       Clock* clock,
@@ -65,9 +64,12 @@
 
   void SetTransportOverhead(DataSize overhead_per_packet);
 
-  // Implements Module.
-  int64_t TimeUntilNextProcess() override;
-  void Process() override;
+  [[deprecated]] int64_t TimeUntilNextProcess();
+  [[deprecated]] void Process();
+
+  // Runs periodic tasks if it is time to run them, returns time until next
+  // call to `MaybeProcess` should be non idle.
+  TimeDelta MaybeProcess();
 
  private:
   class WrappingBitrateEstimator : public RemoteBitrateEstimator {
diff --git a/modules/congestion_controller/receive_side_congestion_controller.cc b/modules/congestion_controller/receive_side_congestion_controller.cc
index 57be0d2..e495a4d 100644
--- a/modules/congestion_controller/receive_side_congestion_controller.cc
+++ b/modules/congestion_controller/receive_side_congestion_controller.cc
@@ -184,6 +184,21 @@
   remote_bitrate_estimator_.Process();
 }
 
+TimeDelta ReceiveSideCongestionController::MaybeProcess() {
+  int64_t time_until_rbe_ms = remote_bitrate_estimator_.TimeUntilNextProcess();
+  if (time_until_rbe_ms <= 0) {
+    remote_bitrate_estimator_.Process();
+    time_until_rbe_ms = remote_bitrate_estimator_.TimeUntilNextProcess();
+  }
+  int64_t time_until_rep_ms = remote_estimator_proxy_.TimeUntilNextProcess();
+  if (time_until_rep_ms <= 0) {
+    remote_estimator_proxy_.Process();
+    time_until_rep_ms = remote_estimator_proxy_.TimeUntilNextProcess();
+  }
+  int64_t time_until_next_ms = std::min(time_until_rbe_ms, time_until_rep_ms);
+  return TimeDelta::Millis(std::max<int64_t>(time_until_next_ms, 0));
+}
+
 void ReceiveSideCongestionController::SetMaxDesiredReceiveBitrate(
     DataRate bitrate) {
   remb_throttler_.SetMaxDesiredReceiveBitrate(bitrate);