Using shared task queue for congestion controller.

This simplifies the code and removes the need for a lot of bookkeeping
variables.

Bug: webrtc:9232
Change-Id: I0c9a4b0741ed5353caa22ba5acdcb166357441f2
Reviewed-on: https://webrtc-review.googlesource.com/74240
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#23149}
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index 78ede7e..2a4a549 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -29,6 +29,7 @@
 
 std::unique_ptr<SendSideCongestionControllerInterface> CreateController(
     Clock* clock,
+    rtc::TaskQueue* task_queue,
     webrtc::RtcEventLog* event_log,
     PacedSender* pacer,
     const BitrateConstraints& bitrate_config,
@@ -36,7 +37,7 @@
   if (task_queue_controller) {
     RTC_LOG(LS_INFO) << "Using TaskQueue based SSCC";
     return rtc::MakeUnique<webrtc::webrtc_cc::SendSideCongestionController>(
-        clock, event_log, pacer, bitrate_config.start_bitrate_bps,
+        clock, task_queue, event_log, pacer, bitrate_config.start_bitrate_bps,
         bitrate_config.min_bitrate_bps, bitrate_config.max_bitrate_bps);
   }
   RTC_LOG(LS_INFO) << "Using Legacy SSCC";
@@ -59,13 +60,12 @@
       bitrate_configurator_(bitrate_config),
       process_thread_(ProcessThread::Create("SendControllerThread")),
       observer_(nullptr),
-      send_side_cc_(CreateController(clock,
-                                     event_log,
-                                     &pacer_,
-                                     bitrate_config,
-                                     TaskQueueExperimentEnabled())),
       task_queue_("rtp_send_controller") {
-  send_side_cc_ptr_ = send_side_cc_.get();
+  // Created after task_queue to be able to post to the task queue internally.
+  send_side_cc_ =
+      CreateController(clock, &task_queue_, event_log, &pacer_, bitrate_config,
+                       TaskQueueExperimentEnabled());
+
   process_thread_->RegisterModule(&pacer_, RTC_FROM_HERE);
   process_thread_->RegisterModule(send_side_cc_.get(), RTC_FROM_HERE);
   process_thread_->Start();
@@ -89,7 +89,7 @@
   msg.network_estimate.at_time = msg.at_time;
   msg.network_estimate.bwe_period = TimeDelta::ms(probing_interval_ms);
   uint32_t bandwidth_bps;
-  if (send_side_cc_ptr_->AvailableBandwidth(&bandwidth_bps))
+  if (send_side_cc_->AvailableBandwidth(&bandwidth_bps))
     msg.network_estimate.bandwidth = DataRate::bps(bandwidth_bps);
   msg.network_estimate.loss_rate_ratio = fraction_loss / 255.0;
   msg.network_estimate.round_trip_time = TimeDelta::ms(rtt_ms);
diff --git a/call/rtp_transport_controller_send.h b/call/rtp_transport_controller_send.h
index d5bc25e..67248b2 100644
--- a/call/rtp_transport_controller_send.h
+++ b/call/rtp_transport_controller_send.h
@@ -93,17 +93,7 @@
   const std::unique_ptr<ProcessThread> process_thread_;
   rtc::CriticalSection observer_crit_;
   TargetTransferRateObserver* observer_ RTC_GUARDED_BY(observer_crit_);
-  // Caches send_side_cc_.get(), to avoid racing with destructor.
-  // Note that this is declared before send_side_cc_ to ensure that it is not
-  // invalidated until no more tasks can be running on the send_side_cc_ task
-  // queue.
-  // TODO(srte): Remove this when only the task queue based send side congestion
-  // controller is used and it is no longer accessed synchronously in the
-  // OnNetworkChanged callback.
-  SendSideCongestionControllerInterface* send_side_cc_ptr_;
-  // Declared last since it will issue callbacks from a task queue. Declaring it
-  // last ensures that it is destroyed first.
-  const std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_;
+  std::unique_ptr<SendSideCongestionControllerInterface> send_side_cc_;
   // TODO(perkj): |task_queue_| is supposed to replace |process_thread_|.
   // |task_queue_| is defined last to ensure all pending tasks are cancelled
   // and deleted before any other members.
diff --git a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h
index a352cec..90447e6 100644
--- a/modules/congestion_controller/rtp/include/send_side_congestion_controller.h
+++ b/modules/congestion_controller/rtp/include/send_side_congestion_controller.h
@@ -66,6 +66,7 @@
       public RtcpBandwidthObserver {
  public:
   SendSideCongestionController(const Clock* clock,
+                               rtc::TaskQueue* task_queue,
                                RtcEventLog* event_log,
                                PacedSender* pacer,
                                int start_bitrate_bps,
@@ -150,18 +151,18 @@
   void WaitOnTasksForTest();
 
  private:
-  void MaybeCreateControllers() RTC_RUN_ON(task_queue_ptr_);
-  void MaybeRecreateControllers() RTC_RUN_ON(task_queue_ptr_);
+  void MaybeCreateControllers() RTC_RUN_ON(task_queue_);
+  void MaybeRecreateControllers() RTC_RUN_ON(task_queue_);
 
-  void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_ptr_);
-  void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_ptr_);
-  void UpdatePacerQueue() RTC_RUN_ON(task_queue_ptr_);
+  void StartProcessPeriodicTasks() RTC_RUN_ON(task_queue_);
+  void UpdateControllerWithTimeInterval() RTC_RUN_ON(task_queue_);
+  void UpdatePacerQueue() RTC_RUN_ON(task_queue_);
 
-  void UpdateStreamsConfig() RTC_RUN_ON(task_queue_ptr_);
+  void UpdateStreamsConfig() RTC_RUN_ON(task_queue_);
   void MaybeUpdateOutstandingData();
   void OnReceivedRtcpReceiverReportBlocks(const ReportBlockList& report_blocks,
                                           int64_t now_ms)
-      RTC_RUN_ON(task_queue_ptr_);
+      RTC_RUN_ON(task_queue_);
 
   const Clock* const clock_;
   // PacedSender is thread safe and doesn't need protection here.
@@ -170,57 +171,47 @@
   TransportFeedbackAdapter transport_feedback_adapter_;
 
   const std::unique_ptr<NetworkControllerFactoryInterface>
-      controller_factory_with_feedback_ RTC_GUARDED_BY(task_queue_ptr_);
+      controller_factory_with_feedback_ RTC_GUARDED_BY(task_queue_);
   const std::unique_ptr<NetworkControllerFactoryInterface>
-      controller_factory_fallback_ RTC_GUARDED_BY(task_queue_ptr_);
+      controller_factory_fallback_ RTC_GUARDED_BY(task_queue_);
 
   const std::unique_ptr<PacerController> pacer_controller_
-      RTC_GUARDED_BY(task_queue_ptr_);
+      RTC_GUARDED_BY(task_queue_);
 
   std::unique_ptr<send_side_cc_internal::ControlHandler> control_handler_
-      RTC_GUARDED_BY(task_queue_ptr_);
+      RTC_GUARDED_BY(task_queue_);
 
   std::unique_ptr<NetworkControllerInterface> controller_
-      RTC_GUARDED_BY(task_queue_ptr_);
+      RTC_GUARDED_BY(task_queue_);
 
-  TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_ptr_);
+  TimeDelta process_interval_ RTC_GUARDED_BY(task_queue_);
 
   std::map<uint32_t, RTCPReportBlock> last_report_blocks_
-      RTC_GUARDED_BY(task_queue_ptr_);
-  Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_ptr_);
+      RTC_GUARDED_BY(task_queue_);
+  Timestamp last_report_block_time_ RTC_GUARDED_BY(task_queue_);
 
-  NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_ptr_);
-  NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_ptr_);
-  StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_ptr_);
+  NetworkChangedObserver* observer_ RTC_GUARDED_BY(task_queue_);
+  NetworkControllerConfig initial_config_ RTC_GUARDED_BY(task_queue_);
+  StreamsConfig streams_config_ RTC_GUARDED_BY(task_queue_);
 
   const bool send_side_bwe_with_overhead_;
   // Transport overhead is written by OnNetworkRouteChanged and read by
   // AddPacket.
   // TODO(srte): Remove atomic when feedback adapter runs on task queue.
   std::atomic<size_t> transport_overhead_bytes_per_packet_;
-  bool network_available_ RTC_GUARDED_BY(task_queue_ptr_);
-  bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_ptr_);
-  bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_ptr_);
+  bool network_available_ RTC_GUARDED_BY(task_queue_);
+  bool periodic_tasks_enabled_ RTC_GUARDED_BY(task_queue_);
+  bool packet_feedback_available_ RTC_GUARDED_BY(task_queue_);
   send_side_cc_internal::PeriodicTask* pacer_queue_update_task_
-      RTC_GUARDED_BY(task_queue_ptr_);
+      RTC_GUARDED_BY(task_queue_);
   send_side_cc_internal::PeriodicTask* controller_task_
-      RTC_GUARDED_BY(task_queue_ptr_);
+      RTC_GUARDED_BY(task_queue_);
 
   // Protects access to last_packet_feedback_vector_ in feedback adapter.
   // TODO(srte): Remove this checker when feedback adapter runs on task queue.
   rtc::RaceChecker worker_race_;
 
-  // Caches task_queue_.get(), to avoid racing with destructor.
-  // Note that this is declared before task_queue_ to ensure that it is not
-  // invalidated until no more tasks can be running on the task queue.
-  rtc::TaskQueue* task_queue_ptr_;
-
-  // Note that moving ownership of the task queue makes it neccessary to make
-  // sure that there is no outstanding tasks on it using destructed objects.
-  // This is currently guranteed by using explicit reset in the destructor of
-  // this class. It is declared last to indicate that it's lifetime is shorter
-  // than all other members.
-  std::unique_ptr<rtc::TaskQueue> task_queue_;
+  rtc::TaskQueue* task_queue_;
 
   RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(SendSideCongestionController);
 };
diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller.cc b/modules/congestion_controller/rtp/send_side_congestion_controller.cc
index 38d23f8..a55beae 100644
--- a/modules/congestion_controller/rtp/send_side_congestion_controller.cc
+++ b/modules/congestion_controller/rtp/send_side_congestion_controller.cc
@@ -305,6 +305,7 @@
 
 SendSideCongestionController::SendSideCongestionController(
     const Clock* clock,
+    rtc::TaskQueue* task_queue,
     RtcEventLog* event_log,
     PacedSender* pacer,
     int start_bitrate_bps,
@@ -328,8 +329,7 @@
       packet_feedback_available_(false),
       pacer_queue_update_task_(nullptr),
       controller_task_(nullptr),
-      task_queue_(MakeUnique<rtc::TaskQueue>("SendSideCCQueue")) {
-  task_queue_ptr_ = task_queue_.get();
+      task_queue_(task_queue) {
   initial_config_.constraints =
       ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_);
   RTC_DCHECK(start_bitrate_bps > 0);
@@ -381,13 +381,7 @@
   RTC_DCHECK(controller_);
 }
 
-SendSideCongestionController::~SendSideCongestionController() {
-  // Must be destructed before any objects used by calls on the task queue.
-  task_queue_.reset();
-  // Singe the task queue has been destructed, it is now safe to reset
-  // task_queue_raw_ which is only used by tasks on the task queue.
-  task_queue_ptr_ = nullptr;
-}
+SendSideCongestionController::~SendSideCongestionController() = default;
 
 void SendSideCongestionController::RegisterPacketFeedbackObserver(
     PacketFeedbackObserver* observer) {
@@ -402,7 +396,7 @@
 void SendSideCongestionController::RegisterNetworkObserver(
     NetworkChangedObserver* observer) {
   task_queue_->PostTask([this, observer]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     RTC_DCHECK(observer_ == nullptr);
     observer_ = observer;
     MaybeCreateControllers();
@@ -415,7 +409,7 @@
   TargetRateConstraints constraints =
       ConvertConstraints(min_bitrate_bps, max_bitrate_bps, clock_);
   task_queue_->PostTask([this, constraints, start_bitrate_bps]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     if (controller_) {
       control_handler_->PostUpdates(
           controller_->OnTargetRateConstraints(constraints));
@@ -433,7 +427,7 @@
     int64_t max_total_bitrate_bps) {
   task_queue_->PostTask([this, min_send_bitrate_bps, max_padding_bitrate_bps,
                          max_total_bitrate_bps]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     streams_config_.min_pacing_rate = DataRate::bps(min_send_bitrate_bps);
     streams_config_.max_padding_rate = DataRate::bps(max_padding_bitrate_bps);
     streams_config_.max_total_allocated_bitrate =
@@ -460,7 +454,7 @@
   if (start_bitrate_bps > 0)
     msg.starting_rate = DataRate::bps(start_bitrate_bps);
   task_queue_->PostTask([this, msg]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     if (controller_) {
       control_handler_->PostUpdates(controller_->OnNetworkRouteChange(msg));
     } else {
@@ -479,7 +473,7 @@
   // running on the task queue.
   // TODO(srte): Remove this function when RtpTransportControllerSend stops
   // calling it.
-  RTC_DCHECK_RUN_ON(task_queue_ptr_);
+  RTC_DCHECK_RUN_ON(task_queue_);
   if (!control_handler_) {
     return false;
   }
@@ -500,7 +494,7 @@
 void SendSideCongestionController::SetPerPacketFeedbackAvailable(
     bool available) {
   task_queue_->PostTask([this, available]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     packet_feedback_available_ = available;
     MaybeRecreateControllers();
   });
@@ -508,7 +502,7 @@
 
 void SendSideCongestionController::EnablePeriodicAlrProbing(bool enable) {
   task_queue_->PostTask([this, enable]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     streams_config_.requests_alr_probing = enable;
     UpdateStreamsConfig();
   });
@@ -533,7 +527,7 @@
   msg.at_time = Timestamp::ms(clock_->TimeInMilliseconds());
   msg.network_available = state == kNetworkUp;
   task_queue_->PostTask([this, msg]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     network_available_ = msg.network_available;
     if (controller_) {
       control_handler_->PostUpdates(controller_->OnNetworkAvailability(msg));
@@ -560,7 +554,7 @@
     msg.size = DataSize::bytes(packet->payload_size);
     msg.send_time = Timestamp::ms(packet->send_time_ms);
     task_queue_->PostTask([this, msg]() {
-      RTC_DCHECK_RUN_ON(task_queue_ptr_);
+      RTC_DCHECK_RUN_ON(task_queue_);
       if (controller_)
         control_handler_->PostUpdates(controller_->OnSentPacket(msg));
     });
@@ -575,7 +569,7 @@
   report.round_trip_time = TimeDelta::ms(avg_rtt_ms);
   report.smoothed = true;
   task_queue_->PostTask([this, report]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     if (controller_)
       control_handler_->PostUpdates(controller_->OnRoundTripTimeUpdate(report));
   });
@@ -594,9 +588,9 @@
   if (!periodic_tasks_enabled_)
     return;
   if (!pacer_queue_update_task_) {
-    pacer_queue_update_task_ = StartPeriodicTask(
-        task_queue_ptr_, PacerQueueUpdateIntervalMs, [this]() {
-          RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    pacer_queue_update_task_ =
+        StartPeriodicTask(task_queue_, PacerQueueUpdateIntervalMs, [this]() {
+          RTC_DCHECK_RUN_ON(task_queue_);
           UpdatePacerQueue();
         });
   }
@@ -611,8 +605,8 @@
     // queue is destroyed or some time after Stop() is called, whichever comes
     // first.
     controller_task_ =
-        StartPeriodicTask(task_queue_ptr_, process_interval_.ms(), [this]() {
-          RTC_DCHECK_RUN_ON(task_queue_ptr_);
+        StartPeriodicTask(task_queue_, process_interval_.ms(), [this]() {
+          RTC_DCHECK_RUN_ON(task_queue_);
           UpdateControllerWithTimeInterval();
         });
   }
@@ -668,7 +662,7 @@
     msg.data_in_flight =
         DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
     task_queue_->PostTask([this, msg]() {
-      RTC_DCHECK_RUN_ON(task_queue_ptr_);
+      RTC_DCHECK_RUN_ON(task_queue_);
       if (controller_)
         control_handler_->PostUpdates(
             controller_->OnTransportPacketsFeedback(msg));
@@ -680,7 +674,7 @@
   DataSize in_flight_data =
       DataSize::bytes(transport_feedback_adapter_.GetOutstandingBytes());
   task_queue_->PostTask([this, in_flight_data]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     pacer_controller_->OnOutstandingData(in_flight_data);
   });
 }
@@ -693,7 +687,7 @@
 
 void SendSideCongestionController::PostPeriodicTasksForTest() {
   task_queue_->PostTask([this]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     UpdateControllerWithTimeInterval();
     UpdatePacerQueue();
   });
@@ -707,7 +701,7 @@
 
 void SendSideCongestionController::SetPacingFactor(float pacing_factor) {
   task_queue_->PostTask([this, pacing_factor]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     streams_config_.pacing_factor = pacing_factor;
     UpdateStreamsConfig();
   });
@@ -715,7 +709,7 @@
 
 void SendSideCongestionController::DisablePeriodicTasks() {
   task_queue_->PostTask([this]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     periodic_tasks_enabled_ = false;
   });
 }
@@ -726,7 +720,7 @@
   msg.receive_time = Timestamp::ms(clock_->TimeInMilliseconds());
   msg.bandwidth = DataRate::bps(bitrate);
   task_queue_->PostTask([this, msg]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     if (controller_)
       control_handler_->PostUpdates(controller_->OnRemoteBitrateReport(msg));
   });
@@ -737,12 +731,12 @@
     int64_t rtt_ms,
     int64_t now_ms) {
   task_queue_->PostTask([this, report_blocks, now_ms]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     OnReceivedRtcpReceiverReportBlocks(report_blocks, now_ms);
   });
 
   task_queue_->PostTask([this, now_ms, rtt_ms]() {
-    RTC_DCHECK_RUN_ON(task_queue_ptr_);
+    RTC_DCHECK_RUN_ON(task_queue_);
     RoundTripTimeUpdate report;
     report.receive_time = Timestamp::ms(now_ms);
     report.round_trip_time = TimeDelta::ms(rtt_ms);
diff --git a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc
index 56f010f..0f59808 100644
--- a/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc
+++ b/modules/congestion_controller/rtp/send_side_congestion_controller_unittest.cc
@@ -74,10 +74,10 @@
                 SetPacingRates(kInitialBitrateBps * kDefaultPacingRate, _));
     EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 3));
     EXPECT_CALL(*pacer_, CreateProbeCluster(kInitialBitrateBps * 5));
-
+    task_queue_ = rtc::MakeUnique<rtc::TaskQueue>("SSCC Test");
     controller_.reset(new SendSideCongestionControllerForTest(
-        &clock_, &event_log_, pacer_.get(), kInitialBitrateBps, 0,
-        5 * kInitialBitrateBps));
+        &clock_, task_queue_.get(), &event_log_, pacer_.get(),
+        kInitialBitrateBps, 0, 5 * kInitialBitrateBps));
     controller_->DisablePeriodicTasks();
     controller_->RegisterNetworkObserver(&observer_);
     controller_->SignalNetworkState(NetworkState::kNetworkUp);
@@ -94,9 +94,10 @@
   void TargetBitrateTrackingSetup() {
     bandwidth_observer_ = nullptr;
     pacer_.reset(new NiceMock<MockPacedSender>());
+    task_queue_ = rtc::MakeUnique<rtc::TaskQueue>("SSCC Test");
     controller_.reset(new SendSideCongestionControllerForTest(
-        &clock_, &event_log_, pacer_.get(), kInitialBitrateBps, 0,
-        5 * kInitialBitrateBps));
+        &clock_, task_queue_.get(), &event_log_, pacer_.get(),
+        kInitialBitrateBps, 0, 5 * kInitialBitrateBps));
     controller_->DisablePeriodicTasks();
     controller_->RegisterNetworkObserver(&target_bitrate_observer_);
     controller_->SignalNetworkState(NetworkState::kNetworkUp);
@@ -166,8 +167,8 @@
   PacketRouter packet_router_;
   std::unique_ptr<NiceMock<MockPacedSender>> pacer_;
   std::unique_ptr<SendSideCongestionControllerForTest> controller_;
-
   rtc::Optional<uint32_t> target_bitrate_bps_;
+  std::unique_ptr<rtc::TaskQueue> task_queue_;
 };
 
 TEST_F(SendSideCongestionControllerTest, OnNetworkChanged) {