Prepare to avoid hops to worker for network events.

This moves the thread hop for network events, from BaseChannel and
into Call. The reason for this is to move the control over those hops
(including DeliverPacket[Async]) into the same class where the state
is held that is affected by those hops. Once that's done, we can start
moving the relevant network state over to the network thread and
eventually remove the hops.

I'm also adding several TODOs for tracking future steps and give
developers a heads up.

Bug: webrtc:11993
Change-Id: Ice7ee3b5b6893532df52039324293979196d341d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/204800
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33138}
diff --git a/audio/audio_receive_stream.cc b/audio/audio_receive_stream.cc
index d6f6140..03dd4c0 100644
--- a/audio/audio_receive_stream.cc
+++ b/audio/audio_receive_stream.cc
@@ -341,6 +341,7 @@
 }
 
 void AudioReceiveStream::AssociateSendStream(AudioSendStream* send_stream) {
+  // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
   RTC_DCHECK_RUN_ON(&worker_thread_checker_);
   channel_receive_->SetAssociatedSendChannel(
       send_stream ? send_stream->GetChannel() : nullptr);
@@ -362,6 +363,8 @@
 
 const AudioSendStream* AudioReceiveStream::GetAssociatedSendStreamForTesting()
     const {
+  // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread or
+  // remove test method and |associated_send_stream_| variable.
   RTC_DCHECK_RUN_ON(&worker_thread_checker_);
   return associated_send_stream_;
 }
diff --git a/audio/channel_receive.cc b/audio/channel_receive.cc
index a8015c8..5c2b918 100644
--- a/audio/channel_receive.cc
+++ b/audio/channel_receive.cc
@@ -787,6 +787,7 @@
 
 void ChannelReceive::SetAssociatedSendChannel(
     const ChannelSendInterface* channel) {
+  // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
   RTC_DCHECK(worker_thread_checker_.IsCurrent());
   MutexLock lock(&assoc_send_channel_lock_);
   associated_send_channel_ = channel;
diff --git a/call/call.cc b/call/call.cc
index f20f4b5..46bf528 100644
--- a/call/call.cc
+++ b/call/call.cc
@@ -335,15 +335,18 @@
 
   NetworkState audio_network_state_;
   NetworkState video_network_state_;
+  // TODO(bugs.webrtc.org/11993): Move aggregate_network_up_ over to the
+  // network thread.
   bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_);
 
   // Audio, Video, and FlexFEC receive streams are owned by the client that
   // creates them.
+  // TODO(bugs.webrtc.org/11993): Move audio_receive_streams_,
+  // video_receive_streams_ and sync_stream_mapping_ over to the network thread.
   std::set<AudioReceiveStream*> audio_receive_streams_
       RTC_GUARDED_BY(worker_thread_);
   std::set<VideoReceiveStream2*> video_receive_streams_
       RTC_GUARDED_BY(worker_thread_);
-
   std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
       RTC_GUARDED_BY(worker_thread_);
 
@@ -378,6 +381,9 @@
     // send side BWE are negotiated.
     const bool use_send_side_bwe;
   };
+
+  // TODO(bugs.webrtc.org/11993): Move receive_rtp_config_ over to the
+  // network thread.
   std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
       RTC_GUARDED_BY(worker_thread_);
 
@@ -800,6 +806,8 @@
              audio_send_ssrcs_.end());
   audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
 
+  // TODO(bugs.webrtc.org/11993): call AssociateSendStream and
+  // UpdateAggregateNetworkState asynchronously on the network thread.
   for (AudioReceiveStream* stream : audio_receive_streams_) {
     if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
       stream->AssociateSendStream(send_stream);
@@ -807,6 +815,7 @@
   }
 
   UpdateAggregateNetworkState();
+
   return send_stream;
 }
 
@@ -825,6 +834,8 @@
   size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
   RTC_DCHECK_EQ(1, num_deleted);
 
+  // TODO(bugs.webrtc.org/11993): call AssociateSendStream and
+  // UpdateAggregateNetworkState asynchronously on the network thread.
   for (AudioReceiveStream* stream : audio_receive_streams_) {
     if (stream->config().rtp.local_ssrc == ssrc) {
       stream->AssociateSendStream(nullptr);
@@ -832,6 +843,7 @@
   }
 
   UpdateAggregateNetworkState();
+
   delete send_stream;
 }
 
@@ -842,11 +854,19 @@
   EnsureStarted();
   event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>(
       CreateRtcLogStreamConfig(config)));
+
+  // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream|
+  // and |audio_receiver_controller_| out of AudioReceiveStream construction and
+  // set it up asynchronously on the network thread (the registration and
+  // |audio_receiver_controller_| need to live on the network thread).
   AudioReceiveStream* receive_stream = new AudioReceiveStream(
       clock_, &audio_receiver_controller_, transport_send_ptr_->packet_router(),
       module_process_thread_->process_thread(), config_.neteq_factory, config,
       config_.audio_state, event_log_);
 
+  // TODO(bugs.webrtc.org/11993): Update the below on the network thread.
+  // We could possibly set up the audio_receiver_controller_ association up
+  // as part of the async setup.
   receive_rtp_config_.emplace(config.rtp.remote_ssrc, ReceiveRtpConfig(config));
   audio_receive_streams_.insert(receive_stream);
 
@@ -873,8 +893,12 @@
   uint32_t ssrc = config.rtp.remote_ssrc;
   receive_side_cc_.GetRemoteBitrateEstimator(UseSendSideBwe(config))
       ->RemoveStream(ssrc);
+
+  // TODO(bugs.webrtc.org/11993): Access the map, rtp config, call ConfigureSync
+  // and UpdateAggregateNetworkState on the network thread.
   audio_receive_streams_.erase(audio_receive_stream);
   const std::string& sync_group = audio_receive_stream->config().sync_group;
+
   const auto it = sync_stream_mapping_.find(sync_group);
   if (it != sync_stream_mapping_.end() && it->second == audio_receive_stream) {
     sync_stream_mapping_.erase(it);
@@ -883,6 +907,9 @@
   receive_rtp_config_.erase(ssrc);
 
   UpdateAggregateNetworkState();
+  // TODO(bugs.webrtc.org/11993): Consider if deleting |audio_receive_stream|
+  // on the network thread would be better or if we'd need to tear down the
+  // state in two phases.
   delete audio_receive_stream;
 }
 
@@ -995,13 +1022,15 @@
 
   EnsureStarted();
 
-  TaskQueueBase* current = GetCurrentTaskQueueOrThread();
-  RTC_CHECK(current);
+  // TODO(bugs.webrtc.org/11993): Move the registration between |receive_stream|
+  // and |video_receiver_controller_| out of VideoReceiveStream2 construction
+  // and set it up asynchronously on the network thread (the registration and
+  // |video_receiver_controller_| need to live on the network thread).
   VideoReceiveStream2* receive_stream = new VideoReceiveStream2(
-      task_queue_factory_, current, &video_receiver_controller_, num_cpu_cores_,
-      transport_send_ptr_->packet_router(), std::move(configuration),
-      module_process_thread_->process_thread(), call_stats_.get(), clock_,
-      new VCMTiming(clock_));
+      task_queue_factory_, worker_thread_, &video_receiver_controller_,
+      num_cpu_cores_, transport_send_ptr_->packet_router(),
+      std::move(configuration), module_process_thread_->process_thread(),
+      call_stats_.get(), clock_, new VCMTiming(clock_));
 
   const webrtc::VideoReceiveStream::Config& config = receive_stream->config();
   if (config.rtp.rtx_ssrc) {
@@ -1134,34 +1163,54 @@
 }
 
 void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
-  RTC_DCHECK_RUN_ON(worker_thread_);
-  switch (media) {
-    case MediaType::AUDIO:
-      audio_network_state_ = state;
-      break;
-    case MediaType::VIDEO:
-      video_network_state_ = state;
-      break;
-    case MediaType::ANY:
-    case MediaType::DATA:
-      RTC_NOTREACHED();
-      break;
-  }
+  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK(media == MediaType::AUDIO || media == MediaType::VIDEO);
 
-  UpdateAggregateNetworkState();
-  for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
-    video_receive_stream->SignalNetworkState(video_network_state_);
+  auto closure = [this, media, state]() {
+    // TODO(bugs.webrtc.org/11993): Move this over to the network thread.
+    RTC_DCHECK_RUN_ON(worker_thread_);
+    if (media == MediaType::AUDIO) {
+      audio_network_state_ = state;
+    } else {
+      RTC_DCHECK_EQ(media, MediaType::VIDEO);
+      video_network_state_ = state;
+    }
+
+    // TODO(tommi): Is it necessary to always do this, including if there
+    // was no change in state?
+    UpdateAggregateNetworkState();
+
+    // TODO(tommi): Is it right to do this if media == AUDIO?
+    for (VideoReceiveStream2* video_receive_stream : video_receive_streams_) {
+      video_receive_stream->SignalNetworkState(video_network_state_);
+    }
+  };
+
+  if (network_thread_ == worker_thread_) {
+    closure();
+  } else {
+    // TODO(bugs.webrtc.org/11993): Remove workaround when we no longer need to
+    // post to the worker thread.
+    worker_thread_->PostTask(ToQueuedTask(task_safety_, std::move(closure)));
   }
 }
 
 void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
-  RTC_DCHECK_RUN_ON(worker_thread_);
-  for (auto& kv : audio_send_ssrcs_) {
-    kv.second->SetTransportOverhead(transport_overhead_per_packet);
-  }
+  RTC_DCHECK_RUN_ON(network_thread_);
+  worker_thread_->PostTask(
+      ToQueuedTask(task_safety_, [this, transport_overhead_per_packet]() {
+        // TODO(bugs.webrtc.org/11993): Move this over to the network thread.
+        RTC_DCHECK_RUN_ON(worker_thread_);
+        for (auto& kv : audio_send_ssrcs_) {
+          kv.second->SetTransportOverhead(transport_overhead_per_packet);
+        }
+      }));
 }
 
 void Call::UpdateAggregateNetworkState() {
+  // TODO(bugs.webrtc.org/11993): Move this over to the network thread.
+  // RTC_DCHECK_RUN_ON(network_thread_);
+
   RTC_DCHECK_RUN_ON(worker_thread_);
 
   bool have_audio =
@@ -1241,6 +1290,7 @@
 }
 
 void Call::ConfigureSync(const std::string& sync_group) {
+  // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
   // Set sync only if there was no previous one.
   if (sync_group.empty())
     return;
@@ -1452,6 +1502,9 @@
 }
 
 void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
+  // TODO(bugs.webrtc.org/11993): Expect to be called on the network thread.
+  // This method is called synchronously via |OnRtpPacket()| (see DeliverRtp)
+  // on the same thread.
   RTC_DCHECK_RUN_ON(worker_thread_);
   RtpPacketReceived parsed_packet;
   if (!parsed_packet.Parse(packet, length))
diff --git a/media/engine/webrtc_video_engine.cc b/media/engine/webrtc_video_engine.cc
index 161c075..244a1a9 100644
--- a/media/engine/webrtc_video_engine.cc
+++ b/media/engine/webrtc_video_engine.cc
@@ -1803,7 +1803,7 @@
 }
 
 void WebRtcVideoChannel::OnReadyToSend(bool ready) {
-  RTC_DCHECK_RUN_ON(&thread_checker_);
+  RTC_DCHECK_RUN_ON(&network_thread_checker_);
   RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready.");
   call_->SignalChannelNetworkState(
       webrtc::MediaType::VIDEO,
@@ -1813,11 +1813,11 @@
 void WebRtcVideoChannel::OnNetworkRouteChanged(
     const std::string& transport_name,
     const rtc::NetworkRoute& network_route) {
-  RTC_DCHECK_RUN_ON(&thread_checker_);
-  call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name,
-                                                             network_route);
-  call_->GetTransportControllerSend()->OnTransportOverheadChanged(
-      network_route.packet_overhead);
+  RTC_DCHECK_RUN_ON(&network_thread_checker_);
+  webrtc::RtpTransportControllerSendInterface* transport =
+      call_->GetTransportControllerSend();
+  transport->OnNetworkRouteChanged(transport_name, network_route);
+  transport->OnTransportOverheadChanged(network_route.packet_overhead);
 }
 
 void WebRtcVideoChannel::SetInterface(NetworkInterface* iface) {
diff --git a/media/engine/webrtc_voice_engine.cc b/media/engine/webrtc_voice_engine.cc
index 9efef3a..0df96f3 100644
--- a/media/engine/webrtc_voice_engine.cc
+++ b/media/engine/webrtc_voice_engine.cc
@@ -2290,7 +2290,7 @@
 void WebRtcVoiceMediaChannel::OnNetworkRouteChanged(
     const std::string& transport_name,
     const rtc::NetworkRoute& network_route) {
-  RTC_DCHECK_RUN_ON(worker_thread_);
+  RTC_DCHECK_RUN_ON(&network_thread_checker_);
   call_->GetTransportControllerSend()->OnNetworkRouteChanged(transport_name,
                                                              network_route);
   call_->OnAudioTransportOverheadChanged(network_route.packet_overhead);
@@ -2335,7 +2335,7 @@
 }
 
 void WebRtcVoiceMediaChannel::OnReadyToSend(bool ready) {
-  RTC_DCHECK_RUN_ON(worker_thread_);
+  RTC_DCHECK_RUN_ON(&network_thread_checker_);
   RTC_LOG(LS_VERBOSE) << "OnReadyToSend: " << (ready ? "Ready." : "Not ready.");
   call_->SignalChannelNetworkState(
       webrtc::MediaType::AUDIO,
diff --git a/pc/channel.cc b/pc/channel.cc
index b672a96..16e2263 100644
--- a/pc/channel.cc
+++ b/pc/channel.cc
@@ -369,7 +369,7 @@
 
 void BaseChannel::OnNetworkRouteChanged(
     absl::optional<rtc::NetworkRoute> network_route) {
-  RTC_LOG(LS_INFO) << "Network route for " << ToString() << " was changed.";
+  RTC_LOG(LS_INFO) << "Network route changed for " << ToString();
 
   RTC_DCHECK_RUN_ON(network_thread());
   rtc::NetworkRoute new_route;
@@ -380,10 +380,7 @@
   // use the same transport name and MediaChannel::OnNetworkRouteChanged cannot
   // work correctly. Intentionally leave it broken to simplify the code and
   // encourage the users to stop using non-muxing RTCP.
-  worker_thread_->PostTask(ToQueuedTask(alive_, [this, new_route] {
-    RTC_DCHECK_RUN_ON(worker_thread());
-    media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
-  }));
+  media_channel_->OnNetworkRouteChanged(transport_name_, new_route);
 }
 
 sigslot::signal1<ChannelInterface*>& BaseChannel::SignalFirstPacketReceived() {
@@ -399,10 +396,8 @@
 }
 
 void BaseChannel::OnTransportReadyToSend(bool ready) {
-  worker_thread_->PostTask(ToQueuedTask(alive_, [this, ready] {
-    RTC_DCHECK_RUN_ON(worker_thread());
-    media_channel_->OnReadyToSend(ready);
-  }));
+  RTC_DCHECK_RUN_ON(network_thread());
+  media_channel_->OnReadyToSend(ready);
 }
 
 bool BaseChannel::SendPacket(bool rtcp,
diff --git a/pc/channel_unittest.cc b/pc/channel_unittest.cc
index f2e93d6..4a0a6b4 100644
--- a/pc/channel_unittest.cc
+++ b/pc/channel_unittest.cc
@@ -1205,11 +1205,13 @@
     CreateChannels(0, 0);
     EXPECT_FALSE(media_channel1_->ready_to_send());
 
-    channel1_->OnTransportReadyToSend(true);
+    network_thread_->PostTask(
+        RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(true); });
     WaitForThreads();
     EXPECT_TRUE(media_channel1_->ready_to_send());
 
-    channel1_->OnTransportReadyToSend(false);
+    network_thread_->PostTask(
+        RTC_FROM_HERE, [this] { channel1_->OnTransportReadyToSend(false); });
     WaitForThreads();
     EXPECT_FALSE(media_channel1_->ready_to_send());
   }