Revert "[DataChannel] Send and receive packets on the network thread."

This reverts commit fe53fec24e02d2d644220f913c3f9ae596bbb2d9.

Reason for revert: Speculative revert, may be breaking downstream project

Original change's description:
> [DataChannel] Send and receive packets on the network thread.
>
> This updates sctp channels, including work that happens between the
> data channel controller and the transport, to run on the network
> thread. Previously all network traffic related to data channels was
> routed through the signaling thread before going to either the network
> thread or the caller's thread (e.g. js thread in chrome). Now the
> calls can go straight from the network thread to the JS thread with
> enabling a special flag on the observer (see below) and similarly
> calls to send data, involve 2 threads instead of 3.
>
> * Custom data channel observer adapter implementation that
>   maintains compatibility with existing observer implementations in
>   that notifications are delivered on the signaling thread.
>   The adapter can be explicitly disabled for implementations that
>   want to optimize the callback path and promise to not block the
>   network thread.
> * Remove the signaling thread copy of data channels in the controller.
> * Remove several PostTask operations that were needed to keep things
>   in sync (but the need has gone away).
> * Update tests for the controller to consistently call
>   TeardownDataChannelTransport_n to match with production.
> * Update stats collectors (current and legacy) to fetch the data
>   channel stats on the network thread where they're maintained.
> * Remove the AsyncChannelCloseTeardown test since the async teardown
>   step has gone away.
> * Remove `sid_s` in the channel code since we only need the network
>   state now.
> * For the custom observer support (with and without data adapter) and
>   maintain compatibility with existing implementations, added a new
>   proxy macro that allows an implementation to selectively provide
>   its own implementation without being proxied. This is used for
>   registering/unregistering a data channel observer.
> * Update the data channel proxy to map most methods to the network
>   thread, avoiding the interim jump to the signaling thread.
> * Update a plethora of thread checkers from signaling to network.
>
> Bug: webrtc:11547
> Change-Id: Ib4cff1482e31c46008e187189a79e967389bc518
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299142
> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> Reviewed-by: Henrik Boström <hbos@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#39760}

Bug: webrtc:11547
Change-Id: Id0d65594bf727ccea5c49093c942b09714d101ad
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300341
Auto-Submit: Andrey Logvin <landrey@webrtc.org>
Owners-Override: Andrey Logvin <landrey@webrtc.org>
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39764}
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index 20d5fe9..36e8be1 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -29,15 +29,8 @@
 }
 
 bool DataChannelController::HasDataChannelsForTest() const {
-  auto has_channels = [&] {
-    RTC_DCHECK_RUN_ON(network_thread());
-    return !sctp_data_channels_n_.empty();
-  };
-
-  if (network_thread()->IsCurrent())
-    return has_channels();
-
-  return network_thread()->BlockingCall(std::move(has_channels));
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return !sctp_data_channels_.empty();
 }
 
 bool DataChannelController::HasUsedDataChannels() const {
@@ -73,15 +66,11 @@
 void DataChannelController::OnChannelStateChanged(
     SctpDataChannel* channel,
     DataChannelInterface::DataState state) {
-  RTC_DCHECK_RUN_ON(network_thread());
+  RTC_DCHECK_RUN_ON(signaling_thread());
   if (state == DataChannelInterface::DataState::kClosed)
     OnSctpDataChannelClosed(channel);
 
-  signaling_thread()->PostTask(
-      SafeTask(signaling_safety_.flag(),
-               [this, channel_id = channel->internal_id(), state = state] {
-                 pc_->OnSctpDataChannelStateChanged(channel_id, state);
-               }));
+  pc_->OnSctpDataChannelStateChanged(channel->internal_id(), state);
 }
 
 void DataChannelController::OnDataReceived(
@@ -93,22 +82,27 @@
   if (HandleOpenMessage_n(channel_id, type, buffer))
     return;
 
-  auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
-    return c->sid_n().stream_id_int() == channel_id;
-  });
-
-  if (it != sctp_data_channels_n_.end())
-    (*it)->OnDataReceived(type, buffer);
+  signaling_thread()->PostTask(
+      SafeTask(signaling_safety_.flag(), [this, channel_id, type, buffer] {
+        RTC_DCHECK_RUN_ON(signaling_thread());
+        // TODO(bugs.webrtc.org/11547): The data being received should be
+        // delivered on the network thread.
+        auto it = FindChannel(StreamId(channel_id));
+        if (it != sctp_data_channels_.end())
+          (*it)->OnDataReceived(type, buffer);
+      }));
 }
 
 void DataChannelController::OnChannelClosing(int channel_id) {
   RTC_DCHECK_RUN_ON(network_thread());
-  auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
-    return c->sid_n().stream_id_int() == channel_id;
-  });
-
-  if (it != sctp_data_channels_n_.end())
-    (*it)->OnClosingProcedureStartedRemotely();
+  signaling_thread()->PostTask(
+      SafeTask(signaling_safety_.flag(), [this, channel_id] {
+        RTC_DCHECK_RUN_ON(signaling_thread());
+        // TODO(bugs.webrtc.org/11547): Should run on the network thread.
+        auto it = FindChannel(StreamId(channel_id));
+        if (it != sctp_data_channels_.end())
+          (*it)->OnClosingProcedureStartedRemotely();
+      }));
 }
 
 void DataChannelController::OnChannelClosed(int channel_id) {
@@ -118,44 +112,48 @@
   auto it = absl::c_find_if(sctp_data_channels_n_,
                             [&](const auto& c) { return c->sid_n() == sid; });
 
-  if (it != sctp_data_channels_n_.end()) {
-    rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
+  if (it != sctp_data_channels_n_.end())
     sctp_data_channels_n_.erase(it);
-    channel->OnClosingProcedureComplete();
-  }
+
+  signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    auto it = FindChannel(sid);
+    // Remove the channel from our list, close it and free up resources.
+    if (it != sctp_data_channels_.end()) {
+      rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
+      // Note: this causes OnSctpDataChannelClosed() to not do anything
+      // when called from within `OnClosingProcedureComplete`.
+      sctp_data_channels_.erase(it);
+
+      channel->OnClosingProcedureComplete();
+    }
+  }));
 }
 
 void DataChannelController::OnReadyToSend() {
   RTC_DCHECK_RUN_ON(network_thread());
-  auto copy = sctp_data_channels_n_;
-  for (const auto& channel : copy) {
-    if (channel->sid_n().HasValue()) {
+  signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    auto copy = sctp_data_channels_;
+    for (const auto& channel : copy)
       channel->OnTransportReady();
-    } else {
-      // This happens for role==SSL_SERVER channels when we get notified by
-      // the transport *before* the SDP code calls `AllocateSctpSids` to
-      // trigger assignment of sids. In this case OnTransportReady() will be
-      // called from within `AllocateSctpSids` below.
-      RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel.";
-    }
-  }
+  }));
 }
 
 void DataChannelController::OnTransportClosed(RTCError error) {
   RTC_DCHECK_RUN_ON(network_thread());
-  // This loop will close all data channels and trigger a callback to
-  // `OnSctpDataChannelClosed` which will modify `sctp_data_channels_n_`, so
-  // we create a local copy while we do the fan-out.
-  auto copy = sctp_data_channels_n_;
-  for (const auto& channel : copy)
-    channel->OnTransportChannelClosed(error);
+  signaling_thread()->PostTask(
+      SafeTask(signaling_safety_.flag(), [this, error] {
+        RTC_DCHECK_RUN_ON(signaling_thread());
+        OnTransportChannelClosed(error);
+      }));
 }
 
 void DataChannelController::SetupDataChannelTransport_n() {
   RTC_DCHECK_RUN_ON(network_thread());
 
   // There's a new data channel transport.  This needs to be signaled to the
-  // `sctp_data_channels_n_` so that they can reopen and reconnect.  This is
+  // `sctp_data_channels_` so that they can reopen and reconnect.  This is
   // necessary when bundling is applied.
   NotifyDataChannelsOfTransportCreated();
 }
@@ -167,12 +165,11 @@
 
 void DataChannelController::TeardownDataChannelTransport_n() {
   RTC_DCHECK_RUN_ON(network_thread());
-  if (data_channel_transport_) {
-    data_channel_transport_->SetDataSink(nullptr);
-    set_data_channel_transport(nullptr);
+  if (data_channel_transport()) {
+    data_channel_transport()->SetDataSink(nullptr);
   }
+  set_data_channel_transport(nullptr);
   sctp_data_channels_n_.clear();
-  weak_factory_.InvalidateWeakPtrs();
 }
 
 void DataChannelController::OnTransportChanged(
@@ -188,7 +185,7 @@
       new_data_channel_transport->SetDataSink(this);
 
       // There's a new data channel transport.  This needs to be signaled to the
-      // `sctp_data_channels_n_` so that they can reopen and reconnect.  This is
+      // `sctp_data_channels_` so that they can reopen and reconnect.  This is
       // necessary when bundling is applied.
       NotifyDataChannelsOfTransportCreated();
     }
@@ -197,10 +194,10 @@
 
 std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
     const {
-  RTC_DCHECK_RUN_ON(network_thread());
+  RTC_DCHECK_RUN_ON(signaling_thread());
   std::vector<DataChannelStats> stats;
-  stats.reserve(sctp_data_channels_n_.size());
-  for (const auto& channel : sctp_data_channels_n_)
+  stats.reserve(sctp_data_channels_.size());
+  for (const auto& channel : sctp_data_channels_)
     stats.push_back(channel->GetStats());
   return stats;
 }
@@ -222,38 +219,28 @@
                         << channel_id;
   } else {
     config.open_handshake_role = InternalDataChannelInit::kAcker;
-    auto channel_or_error = CreateDataChannel(label, config);
-    if (channel_or_error.ok()) {
-      signaling_thread()->PostTask(SafeTask(
-          signaling_safety_.flag(),
-          [this, channel = channel_or_error.MoveValue(),
-           ready_to_send = data_channel_transport_->IsReadyToSend()] {
-            RTC_DCHECK_RUN_ON(signaling_thread());
-            OnDataChannelOpenMessage(std::move(channel), ready_to_send);
-          }));
-    } else {
-      RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
-                        << ToString(channel_or_error.error().type());
-    }
+    signaling_thread()->PostTask(
+        SafeTask(signaling_safety_.flag(),
+                 [this, label = std::move(label), config = std::move(config)] {
+                   RTC_DCHECK_RUN_ON(signaling_thread());
+                   OnDataChannelOpenMessage(label, config);
+                 }));
   }
   return true;
 }
 
 void DataChannelController::OnDataChannelOpenMessage(
-    rtc::scoped_refptr<SctpDataChannel> channel,
-    bool ready_to_send) {
-  has_used_data_channels_ = true;
-  auto proxy = SctpDataChannel::CreateProxy(channel);
-
-  pc_->Observer()->OnDataChannel(proxy);
-  pc_->NoteDataAddedEvent();
-
-  if (ready_to_send) {
-    network_thread()->PostTask([channel = std::move(channel)] {
-      if (channel->state() != DataChannelInterface::DataState::kClosed)
-        channel->OnTransportReady();
-    });
+    const std::string& label,
+    const InternalDataChannelInit& config) {
+  auto channel_or_error = InternalCreateDataChannelWithProxy(label, config);
+  if (!channel_or_error.ok()) {
+    RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
+                      << ToString(channel_or_error.error().type());
+    return;
   }
+
+  pc_->Observer()->OnDataChannel(channel_or_error.MoveValue());
+  pc_->NoteDataAddedEvent();
 }
 
 // RTC_RUN_ON(network_thread())
@@ -282,31 +269,6 @@
   return RTCError::OK();
 }
 
-// RTC_RUN_ON(network_thread())
-RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>>
-DataChannelController::CreateDataChannel(const std::string& label,
-                                         InternalDataChannelInit& config) {
-  StreamId sid(config.id);
-  RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role);
-  if (!err.ok())
-    return err;
-
-  // In case `sid` has changed. Update `config` accordingly.
-  config.id = sid.stream_id_int();
-
-  rtc::scoped_refptr<SctpDataChannel> channel = SctpDataChannel::Create(
-      weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr,
-      config, signaling_thread(), network_thread());
-  RTC_DCHECK(channel);
-  sctp_data_channels_n_.push_back(channel);
-
-  // If we have an id already, notify the transport.
-  if (sid.HasValue())
-    AddSctpDataStream(sid);
-
-  return channel;
-}
-
 RTCErrorOr<rtc::scoped_refptr<DataChannelInterface>>
 DataChannelController::InternalCreateDataChannelWithProxy(
     const std::string& label,
@@ -321,25 +283,29 @@
   bool ready_to_send = false;
   InternalDataChannelInit new_config = config;
   StreamId sid(new_config.id);
+  auto weak_ptr = weak_factory_.GetWeakPtr();
+  RTC_DCHECK(weak_ptr);  // Associate with current thread.
   auto ret = network_thread()->BlockingCall(
       [&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> {
         RTC_DCHECK_RUN_ON(network_thread());
-        auto channel = CreateDataChannel(label, new_config);
-        if (!channel.ok())
-          return channel;
+        RTCError err = ReserveOrAllocateSid(sid, new_config.fallback_ssl_role);
+        if (!err.ok())
+          return err;
+
+        // In case `sid` has changed. Update `new_config` accordingly.
+        new_config.id = sid.stream_id_int();
         ready_to_send =
             data_channel_transport_ && data_channel_transport_->IsReadyToSend();
-        if (ready_to_send) {
-          // If the transport is ready to send because the initial channel
-          // ready signal may have been sent before the DataChannel creation.
-          // This has to be done async because the upper layer objects (e.g.
-          // Chrome glue and WebKit) are not wired up properly until after
-          // `InternalCreateDataChannelWithProxy` returns.
-          network_thread()->PostTask([channel = channel.value()] {
-            if (channel->state() != DataChannelInterface::DataState::kClosed)
-              channel->OnTransportReady();
-          });
-        }
+
+        rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
+            std::move(weak_ptr), label, data_channel_transport_ != nullptr,
+            new_config, signaling_thread(), network_thread()));
+        RTC_DCHECK(channel);
+        sctp_data_channels_n_.push_back(channel);
+
+        // If we have an id already, notify the transport.
+        if (sid.HasValue())
+          AddSctpDataStream(sid);
 
         return channel;
       });
@@ -347,71 +313,114 @@
   if (!ret.ok())
     return ret.MoveError();
 
+  if (ready_to_send) {
+    // Checks if the transport is ready to send because the initial channel
+    // ready signal may have been sent before the DataChannel creation.
+    // This has to be done async because the upper layer objects (e.g.
+    // Chrome glue and WebKit) are not wired up properly until after this
+    // function returns.
+    signaling_thread()->PostTask(
+        SafeTask(signaling_safety_.flag(), [channel = ret.value()] {
+          if (channel->state() != DataChannelInterface::DataState::kClosed)
+            channel->OnTransportReady();
+        }));
+  }
+
+  sctp_data_channels_.push_back(ret.value());
   has_used_data_channels_ = true;
   return SctpDataChannel::CreateProxy(ret.MoveValue());
 }
 
 void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
-  RTC_DCHECK_RUN_ON(network_thread());
-
-  const bool ready_to_send =
-      data_channel_transport_ && data_channel_transport_->IsReadyToSend();
+  RTC_DCHECK_RUN_ON(signaling_thread());
 
   std::vector<std::pair<SctpDataChannel*, StreamId>> channels_to_update;
   std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
-  for (auto it = sctp_data_channels_n_.begin();
-       it != sctp_data_channels_n_.end();) {
-    if (!(*it)->sid_n().HasValue()) {
-      StreamId sid = sid_allocator_.AllocateSid(role);
-      if (sid.HasValue()) {
-        (*it)->SetSctpSid_n(sid);
-        AddSctpDataStream(sid);
-        if (ready_to_send) {
-          RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send.";
-          (*it)->OnTransportReady();
+
+  network_thread()->BlockingCall([&] {
+    RTC_DCHECK_RUN_ON(network_thread());
+    for (auto it = sctp_data_channels_n_.begin();
+         it != sctp_data_channels_n_.end();) {
+      if (!(*it)->sid_n().HasValue()) {
+        StreamId sid = sid_allocator_.AllocateSid(role);
+        if (sid.HasValue()) {
+          (*it)->SetSctpSid_n(sid);
+          AddSctpDataStream(sid);
+          channels_to_update.push_back(std::make_pair((*it).get(), sid));
+        } else {
+          channels_to_close.push_back(std::move(*it));
+          it = sctp_data_channels_n_.erase(it);
+          continue;
         }
-        channels_to_update.push_back(std::make_pair((*it).get(), sid));
-      } else {
-        channels_to_close.push_back(std::move(*it));
-        it = sctp_data_channels_n_.erase(it);
-        continue;
       }
+      ++it;
     }
-    ++it;
-  }
+  });
 
   // Since closing modifies the list of channels, we have to do the actual
   // closing outside the loop.
   for (const auto& channel : channels_to_close) {
     channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
+    // The channel should now have been removed from sctp_data_channels_.
+    RTC_DCHECK(absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+                 return c.get() == channel.get();
+               }) == sctp_data_channels_.end());
+  }
+
+  for (auto& pair : channels_to_update) {
+    auto it = absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+      return c.get() == pair.first;
+    });
+    RTC_DCHECK(it != sctp_data_channels_.end());
+    (*it)->SetSctpSid_s(pair.second);
   }
 }
 
 void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
-  RTC_DCHECK_RUN_ON(network_thread());
-  // After the closing procedure is done, it's safe to use this ID for
-  // another data channel.
-  if (channel->sid_n().HasValue()) {
-    sid_allocator_.ReleaseSid(channel->sid_n());
+  RTC_DCHECK_RUN_ON(signaling_thread());
+
+  network_thread()->BlockingCall([&] {
+    RTC_DCHECK_RUN_ON(network_thread());
+    // After the closing procedure is done, it's safe to use this ID for
+    // another data channel.
+    if (channel->sid_n().HasValue()) {
+      sid_allocator_.ReleaseSid(channel->sid_n());
+    }
+
+    auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
+      return c.get() == channel;
+    });
+
+    if (it != sctp_data_channels_n_.end())
+      sctp_data_channels_n_.erase(it);
+  });
+
+  for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
+       ++it) {
+    if (it->get() == channel) {
+      // Since this method is triggered by a signal from the DataChannel,
+      // we can't free it directly here; we need to free it asynchronously.
+      rtc::scoped_refptr<SctpDataChannel> release = std::move(*it);
+      sctp_data_channels_.erase(it);
+      signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(),
+                                            [release = std::move(release)] {}));
+      return;
+    }
   }
-  auto it = absl::c_find_if(sctp_data_channels_n_,
-                            [&](const auto& c) { return c.get() == channel; });
-  if (it != sctp_data_channels_n_.end())
-    sctp_data_channels_n_.erase(it);
 }
 
 void DataChannelController::OnTransportChannelClosed(RTCError error) {
-  RTC_DCHECK_RUN_ON(network_thread());
+  RTC_DCHECK_RUN_ON(signaling_thread());
   // Use a temporary copy of the SCTP DataChannel list because the
   // DataChannel may callback to us and try to modify the list.
   // TODO(tommi): `OnTransportChannelClosed` is called from
   // `SdpOfferAnswerHandler::DestroyDataChannelTransport` just before
   // `TeardownDataChannelTransport_n` is called (but on the network thread) from
-  // the same function. We can now get rid of this function
-  // (OnTransportChannelClosed) and run this loop from within the
-  // TeardownDataChannelTransport_n callback.
+  // the same function. Once `sctp_data_channels_` moves to the network thread,
+  // we can get rid of this function (OnTransportChannelClosed) and run this
+  // loop from within the TeardownDataChannelTransport_n callback.
   std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
-  temp_sctp_dcs.swap(sctp_data_channels_n_);
+  temp_sctp_dcs.swap(sctp_data_channels_);
   for (const auto& channel : temp_sctp_dcs) {
     channel->OnTransportChannelClosed(error);
   }
@@ -435,10 +444,16 @@
     StreamId sid,
     const SendDataParams& params,
     const rtc::CopyOnWriteBuffer& payload) {
-  RTC_DCHECK_RUN_ON(network_thread());
+  // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
+  // thread instead. Remove the BlockingCall() below and move associated state
+  // to the network thread.
+  RTC_DCHECK_RUN_ON(signaling_thread());
   RTC_DCHECK(data_channel_transport());
-  return data_channel_transport()->SendData(sid.stream_id_int(), params,
-                                            payload);
+
+  return network_thread()->BlockingCall([this, sid, params, payload] {
+    return data_channel_transport()->SendData(sid.stream_id_int(), params,
+                                              payload);
+  });
 }
 
 void DataChannelController::NotifyDataChannelsOfTransportCreated() {
@@ -448,8 +463,22 @@
   for (const auto& channel : sctp_data_channels_n_) {
     if (channel->sid_n().HasValue())
       AddSctpDataStream(channel->sid_n());
-    channel->OnTransportChannelCreated();
   }
+
+  signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    for (const auto& channel : sctp_data_channels_) {
+      channel->OnTransportChannelCreated();
+    }
+  }));
+}
+
+std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator
+DataChannelController::FindChannel(StreamId stream_id) {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+    return c->sid_s() == stream_id;
+  });
 }
 
 rtc::Thread* DataChannelController::network_thread() const {