Reland "[DataChannel] Send and receive packets on the network thread."

This reverts commit 7f16fcda0fd5bb625584b71311dd37b54c096136.

Reason for reland: Re-landing after addressing issues in downstream
code and hardening the ObserverAdapter from situations where attempted
usage of data channel proxies could occur after shutting down the
peer connection and terminating the network thread.

Original change's description:
> Revert "[DataChannel] Send and receive packets on the network thread."
>
> This reverts commit fe53fec24e02d2d644220f913c3f9ae596bbb2d9.
>
> Reason for revert: Speculative revert, may be breaking downstream project
>
> Original change's description:
> > [DataChannel] Send and receive packets on the network thread.
> >
> > This updates sctp channels, including work that happens between the
> > data channel controller and the transport, to run on the network
> > thread. Previously all network traffic related to data channels was
> > routed through the signaling thread before going to either the network
> > thread or the caller's thread (e.g. js thread in chrome). Now the
> > calls can go straight from the network thread to the JS thread with
> > enabling a special flag on the observer (see below) and similarly
> > calls to send data, involve 2 threads instead of 3.
> >
> > * Custom data channel observer adapter implementation that
> >   maintains compatibility with existing observer implementations in
> >   that notifications are delivered on the signaling thread.
> >   The adapter can be explicitly disabled for implementations that
> >   want to optimize the callback path and promise to not block the
> >   network thread.
> > * Remove the signaling thread copy of data channels in the controller.
> > * Remove several PostTask operations that were needed to keep things
> >   in sync (but the need has gone away).
> > * Update tests for the controller to consistently call
> >   TeardownDataChannelTransport_n to match with production.
> > * Update stats collectors (current and legacy) to fetch the data
> >   channel stats on the network thread where they're maintained.
> > * Remove the AsyncChannelCloseTeardown test since the async teardown
> >   step has gone away.
> > * Remove `sid_s` in the channel code since we only need the network
> >   state now.
> > * For the custom observer support (with and without data adapter) and
> >   maintain compatibility with existing implementations, added a new
> >   proxy macro that allows an implementation to selectively provide
> >   its own implementation without being proxied. This is used for
> >   registering/unregistering a data channel observer.
> > * Update the data channel proxy to map most methods to the network
> >   thread, avoiding the interim jump to the signaling thread.
> > * Update a plethora of thread checkers from signaling to network.
> >
> > Bug: webrtc:11547
> > Change-Id: Ib4cff1482e31c46008e187189a79e967389bc518
> > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299142
> > Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> > Reviewed-by: Henrik Boström <hbos@webrtc.org>
> > Cr-Commit-Position: refs/heads/main@{#39760}
>
> Bug: webrtc:11547
> Change-Id: Id0d65594bf727ccea5c49093c942b09714d101ad
> No-Presubmit: true
> No-Tree-Checks: true
> No-Try: true
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300341
> Auto-Submit: Andrey Logvin <landrey@webrtc.org>
> Owners-Override: Andrey Logvin <landrey@webrtc.org>
> Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
> Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#39764}

Bug: webrtc:11547
Change-Id: I47dfa7e7168be0cd2faab4f8f3ebf110c3728af5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300360
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39786}
diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc
index 623a153..18d464b 100644
--- a/pc/sctp_data_channel.cc
+++ b/pc/sctp_data_channel.cc
@@ -15,7 +15,6 @@
 #include <string>
 #include <utility>
 
-#include "absl/cleanup/cleanup.h"
 #include "media/sctp/sctp_transport_internal.h"
 #include "pc/proxy.h"
 #include "rtc_base/checks.h"
@@ -38,8 +37,8 @@
 // Define proxy for DataChannelInterface.
 BEGIN_PROXY_MAP(DataChannel)
 PROXY_PRIMARY_THREAD_DESTRUCTOR()
-PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
-PROXY_METHOD0(void, UnregisterObserver)
+BYPASS_PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
+BYPASS_PROXY_METHOD0(void, UnregisterObserver)
 BYPASS_PROXY_CONSTMETHOD0(std::string, label)
 BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
 BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
@@ -50,20 +49,18 @@
 BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
 BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
 // Can't bypass the proxy since the id may change.
-PROXY_CONSTMETHOD0(int, id)
+PROXY_SECONDARY_CONSTMETHOD0(int, id)
 BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
-PROXY_CONSTMETHOD0(DataState, state)
-PROXY_CONSTMETHOD0(RTCError, error)
-PROXY_CONSTMETHOD0(uint32_t, messages_sent)
-PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
-PROXY_CONSTMETHOD0(uint32_t, messages_received)
-PROXY_CONSTMETHOD0(uint64_t, bytes_received)
-PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
-PROXY_METHOD0(void, Close)
-// TODO(bugs.webrtc.org/11547): Change to run on the network thread.
-PROXY_METHOD1(bool, Send, const DataBuffer&)
+BYPASS_PROXY_CONSTMETHOD0(DataState, state)
+BYPASS_PROXY_CONSTMETHOD0(RTCError, error)
+PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_sent)
+PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_sent)
+PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_received)
+PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_received)
+PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount)
+PROXY_SECONDARY_METHOD0(void, Close)
+PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&)
 END_PROXY_MAP(DataChannel)
-
 }  // namespace
 
 InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base)
@@ -142,6 +139,138 @@
   used_sids_.erase(sid);
 }
 
+// A DataChannelObserver implementation that offers backwards compatibility with
+// implementations that aren't yet ready to be called back on the network
+// thread. This implementation posts events to the signaling thread where
+// events are delivered.
+// In the class, and together with the `SctpDataChannel` implementation, there's
+// special handling for the `state()` property whereby if that property is
+// queried on the channel object while inside an event callback, we return
+// the state that was active at the time the event was issued. This is to avoid
+// a problem with calling the `state()` getter on the proxy, which would do
+// a blocking call to the network thread, effectively flushing operations on
+// the network thread that could cause the state to change and eventually return
+// a misleading or arguably, wrong, state value to the callback implementation.
+// As a future improvement to the ObserverAdapter, we could do the same for
+// other properties that need to be read on the network thread. Eventually
+// all implementations should expect to be called on the network thread though
+// and the ObserverAdapter no longer be necessary.
+class SctpDataChannel::ObserverAdapter : public DataChannelObserver {
+ public:
+  explicit ObserverAdapter(
+      SctpDataChannel* channel,
+      rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety)
+      : channel_(channel), signaling_safety_(std::move(signaling_safety)) {}
+
+  bool IsInsideCallback() const {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    return cached_getters_ != nullptr;
+  }
+
+  DataChannelInterface::DataState cached_state() const {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    RTC_DCHECK(IsInsideCallback());
+    return cached_getters_->state();
+  }
+
+  RTCError cached_error() const {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    RTC_DCHECK(IsInsideCallback());
+    return cached_getters_->error();
+  }
+
+  void SetDelegate(DataChannelObserver* delegate) {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    delegate_ = delegate;
+    safety_.reset(PendingTaskSafetyFlag::CreateDetached());
+  }
+
+  static void DeleteOnSignalingThread(
+      std::unique_ptr<ObserverAdapter> observer) {
+    auto* signaling_thread = observer->signaling_thread();
+    if (!signaling_thread->IsCurrent())
+      signaling_thread->PostTask([observer = std::move(observer)]() {});
+  }
+
+ private:
+  class CachedGetters {
+   public:
+    explicit CachedGetters(ObserverAdapter* adapter)
+        : adapter_(adapter),
+          cached_state_(adapter_->channel_->state()),
+          cached_error_(adapter_->channel_->error()) {
+      RTC_DCHECK_RUN_ON(adapter->network_thread());
+    }
+
+    ~CachedGetters() {
+      if (!was_dropped_) {
+        RTC_DCHECK_RUN_ON(adapter_->signaling_thread());
+        RTC_DCHECK_EQ(adapter_->cached_getters_, this);
+        adapter_->cached_getters_ = nullptr;
+      }
+    }
+
+    bool PrepareForCallback() {
+      RTC_DCHECK_RUN_ON(adapter_->signaling_thread());
+      RTC_DCHECK(was_dropped_);
+      was_dropped_ = false;
+      adapter_->cached_getters_ = this;
+      return adapter_->delegate_ && adapter_->signaling_safety_->alive();
+    }
+
+    RTCError error() { return cached_error_; }
+    DataChannelInterface::DataState state() { return cached_state_; }
+
+   private:
+    ObserverAdapter* const adapter_;
+    bool was_dropped_ = true;
+    const DataChannelInterface::DataState cached_state_;
+    const RTCError cached_error_;
+  };
+
+  void OnStateChange() override {
+    RTC_DCHECK_RUN_ON(network_thread());
+    signaling_thread()->PostTask(
+        SafeTask(safety_.flag(),
+                 [this, cached_state = std::make_unique<CachedGetters>(this)] {
+                   RTC_DCHECK_RUN_ON(signaling_thread());
+                   if (cached_state->PrepareForCallback())
+                     delegate_->OnStateChange();
+                 }));
+  }
+
+  void OnMessage(const DataBuffer& buffer) override {
+    RTC_DCHECK_RUN_ON(network_thread());
+    signaling_thread()->PostTask(SafeTask(
+        safety_.flag(), [this, buffer = buffer,
+                         cached_state = std::make_unique<CachedGetters>(this)] {
+          RTC_DCHECK_RUN_ON(signaling_thread());
+          if (cached_state->PrepareForCallback())
+            delegate_->OnMessage(buffer);
+        }));
+  }
+
+  void OnBufferedAmountChange(uint64_t sent_data_size) override {
+    RTC_DCHECK_RUN_ON(network_thread());
+    signaling_thread()->PostTask(SafeTask(
+        safety_.flag(), [this, sent_data_size,
+                         cached_state = std::make_unique<CachedGetters>(this)] {
+          RTC_DCHECK_RUN_ON(signaling_thread());
+          if (cached_state->PrepareForCallback())
+            delegate_->OnBufferedAmountChange(sent_data_size);
+        }));
+  }
+
+  rtc::Thread* signaling_thread() const { return channel_->signaling_thread_; }
+  rtc::Thread* network_thread() const { return channel_->network_thread_; }
+
+  DataChannelObserver* delegate_ RTC_GUARDED_BY(signaling_thread()) = nullptr;
+  SctpDataChannel* const channel_;
+  ScopedTaskSafety safety_;
+  rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety_;
+  CachedGetters* cached_getters_ RTC_GUARDED_BY(signaling_thread()) = nullptr;
+};
+
 // static
 rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
     rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
@@ -158,10 +287,13 @@
 
 // static
 rtc::scoped_refptr<DataChannelInterface> SctpDataChannel::CreateProxy(
-    rtc::scoped_refptr<SctpDataChannel> channel) {
+    rtc::scoped_refptr<SctpDataChannel> channel,
+    rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety) {
   // Copy thread params to local variables before `std::move()`.
   auto* signaling_thread = channel->signaling_thread_;
   auto* network_thread = channel->network_thread_;
+  channel->observer_adapter_ = std::make_unique<ObserverAdapter>(
+      channel.get(), std::move(signaling_safety));
   return DataChannelProxy::Create(signaling_thread, network_thread,
                                   std::move(channel));
 }
@@ -175,7 +307,6 @@
     rtc::Thread* network_thread)
     : signaling_thread_(signaling_thread),
       network_thread_(network_thread),
-      id_s_(config.id),
       id_n_(config.id),
       internal_id_(GenerateUniqueId()),
       label_(label),
@@ -208,18 +339,87 @@
 }
 
 SctpDataChannel::~SctpDataChannel() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  if (observer_adapter_)
+    ObserverAdapter::DeleteOnSignalingThread(std::move(observer_adapter_));
 }
 
 void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  observer_ = observer;
-  DeliverQueuedReceivedData();
+  // Note: at this point, we do not know on which thread we're being called
+  // from since this method bypasses the proxy. On Android in particular,
+  // registration methods are called from unknown threads.
+
+  // Check if we should set up an observer adapter that will make sure that
+  // callbacks are delivered on the signaling thread rather than directly
+  // on the network thread.
+  const auto* current_thread = rtc::Thread::Current();
+  // TODO(webrtc:11547): Eventually all DataChannelObserver implementations
+  // should be called on the network thread and IsOkToCallOnTheNetworkThread().
+  if (!observer->IsOkToCallOnTheNetworkThread()) {
+    RTC_LOG(LS_WARNING) << "DataChannelObserver - adapter needed";
+    auto prepare_observer = [&]() {
+      RTC_DCHECK(observer_adapter_) << "CreateProxy hasn't been called";
+      observer_adapter_->SetDelegate(observer);
+      return observer_adapter_.get();
+    };
+    // Instantiate the adapter in the right context and then substitute the
+    // observer pointer the SctpDataChannel will call back on, with the adapter.
+    if (signaling_thread_ == current_thread) {
+      observer = prepare_observer();
+    } else {
+      observer = signaling_thread_->BlockingCall(std::move(prepare_observer));
+    }
+  }
+
+  // Now do the observer registration on the network thread.
+  auto register_observer = [&] {
+    RTC_DCHECK_RUN_ON(network_thread_);
+    observer_ = observer;
+    DeliverQueuedReceivedData();
+  };
+
+  if (network_thread_ == current_thread) {
+    register_observer();
+  } else {
+    network_thread_->BlockingCall(std::move(register_observer));
+  }
 }
 
 void SctpDataChannel::UnregisterObserver() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  observer_ = nullptr;
+  // Note: As with `RegisterObserver`, the proxy is being bypassed.
+  const auto* current_thread = rtc::Thread::Current();
+  // Callers must not be invoking the unregistration from the network thread
+  // (assuming a multi-threaded environment where we have a dedicated network
+  // thread). That would indicate non-network related work happening on the
+  // network thread or that unregistration is being done from within a callback
+  // (without unwinding the stack, which is a requirement).
+  // The network thread is not allowed to make blocking calls to the signaling
+  // thread, so that would blow up if attempted. Since we support an adapter
+  // for observers that are not safe to call on the network thread, we do
+  // need to check+free it on the signaling thread.
+  RTC_DCHECK(current_thread != network_thread_ ||
+             network_thread_ == signaling_thread_);
+
+  auto unregister_observer = [&] {
+    RTC_DCHECK_RUN_ON(network_thread_);
+    observer_ = nullptr;
+  };
+
+  if (current_thread == network_thread_) {
+    unregister_observer();
+  } else {
+    network_thread_->BlockingCall(std::move(unregister_observer));
+  }
+
+  auto clear_observer = [&]() {
+    if (observer_adapter_)
+      observer_adapter_->SetDelegate(nullptr);
+  };
+
+  if (current_thread != signaling_thread_) {
+    signaling_thread_->BlockingCall(std::move(clear_observer));
+  } else {
+    clear_observer();
+  }
 }
 
 std::string SctpDataChannel::label() const {
@@ -261,8 +461,11 @@
 }
 
 int SctpDataChannel::id() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  return id_s_.stream_id_int();
+  RTC_DCHECK_RUN_ON(network_thread_);
+  // TODO(tommi): Once an ID has been assigned, it won't change (can be
+  // considered const). We could do special handling of this and allow bypassing
+  // the proxy so that we can return a valid id without thread hopping.
+  return id_n_.stream_id_int();
 }
 
 Priority SctpDataChannel::priority() const {
@@ -270,12 +473,12 @@
 }
 
 uint64_t SctpDataChannel::buffered_amount() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   return queued_send_data_.byte_count();
 }
 
 void SctpDataChannel::Close() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   if (state_ == kClosing || state_ == kClosed)
     return;
   SetState(kClosing);
@@ -284,40 +487,70 @@
 }
 
 SctpDataChannel::DataState SctpDataChannel::state() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  return state_;
+  // Note: The proxy is bypassed for the `state()` accessor. This is to allow
+  // observer callbacks to query what the new state is from within a state
+  // update notification without having to do a blocking call to the network
+  // thread from within a callback. This also makes it so that the returned
+  // state is guaranteed to be the new state that provoked the state change
+  // notification, whereby a blocking call to the network thread might end up
+  // getting put behind other messages on the network thread and eventually
+  // fetch a different state value (since pending messages might cause the
+  // state to change in the meantime).
+  const auto* current_thread = rtc::Thread::Current();
+  if (current_thread == signaling_thread_ && observer_adapter_ &&
+      observer_adapter_->IsInsideCallback()) {
+    return observer_adapter_->cached_state();
+  }
+
+  auto return_state = [&] {
+    RTC_DCHECK_RUN_ON(network_thread_);
+    return state_;
+  };
+
+  return current_thread == network_thread_
+             ? return_state()
+             : network_thread_->BlockingCall(std::move(return_state));
 }
 
 RTCError SctpDataChannel::error() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  return error_;
+  const auto* current_thread = rtc::Thread::Current();
+  if (current_thread == signaling_thread_ && observer_adapter_ &&
+      observer_adapter_->IsInsideCallback()) {
+    return observer_adapter_->cached_error();
+  }
+
+  auto return_error = [&] {
+    RTC_DCHECK_RUN_ON(network_thread_);
+    return error_;
+  };
+
+  return current_thread == network_thread_
+             ? return_error()
+             : network_thread_->BlockingCall(std::move(return_error));
 }
 
 uint32_t SctpDataChannel::messages_sent() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   return messages_sent_;
 }
 
 uint64_t SctpDataChannel::bytes_sent() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   return bytes_sent_;
 }
 
 uint32_t SctpDataChannel::messages_received() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   return messages_received_;
 }
 
 uint64_t SctpDataChannel::bytes_received() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   return bytes_received_;
 }
 
 bool SctpDataChannel::Send(const DataBuffer& buffer) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
-  // thread. Bring buffer management etc to the network thread and keep the
-  // operational state management on the signaling thread.
+  RTC_DCHECK_RUN_ON(network_thread_);
 
   if (state_ != kOpen) {
     return false;
@@ -335,25 +568,17 @@
   return true;
 }
 
-void SctpDataChannel::SetSctpSid_s(StreamId sid) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  RTC_DCHECK(!id_s_.HasValue());
-  RTC_DCHECK(sid.HasValue());
-  RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
-  RTC_DCHECK_EQ(state_, kConnecting);
-
-  id_s_ = sid;
-}
-
 void SctpDataChannel::SetSctpSid_n(StreamId sid) {
   RTC_DCHECK_RUN_ON(network_thread_);
   RTC_DCHECK(!id_n_.HasValue());
   RTC_DCHECK(sid.HasValue());
+  RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
+  RTC_DCHECK_EQ(state_, kConnecting);
   id_n_ = sid;
 }
 
 void SctpDataChannel::OnClosingProcedureStartedRemotely() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   if (state_ != kClosing && state_ != kClosed) {
     // Don't bother sending queued data since the side that initiated the
     // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
@@ -369,7 +594,7 @@
 }
 
 void SctpDataChannel::OnClosingProcedureComplete() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   // If the closing procedure is complete, we should have finished sending
   // all pending data and transitioned to kClosing already.
   RTC_DCHECK_EQ(state_, kClosing);
@@ -378,12 +603,12 @@
 }
 
 void SctpDataChannel::OnTransportChannelCreated() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-
+  RTC_DCHECK_RUN_ON(network_thread_);
   connected_to_transport_ = true;
 }
 
 void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
+  RTC_DCHECK_RUN_ON(network_thread_);
   // The SctpTransport is unusable, which could come from multiple reasons:
   // - the SCTP m= section was rejected
   // - the DTLS transport is closed
@@ -392,7 +617,7 @@
 }
 
 DataChannelStats SctpDataChannel::GetStats() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   DataChannelStats stats{internal_id_,        id(),         label(),
                          protocol(),          state(),      messages_sent(),
                          messages_received(), bytes_sent(), bytes_received()};
@@ -401,25 +626,25 @@
 
 void SctpDataChannel::OnDataReceived(DataMessageType type,
                                      const rtc::CopyOnWriteBuffer& payload) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
 
   if (type == DataMessageType::kControl) {
     if (handshake_state_ != kHandshakeWaitingForAck) {
       // Ignore it if we are not expecting an ACK message.
       RTC_LOG(LS_WARNING)
           << "DataChannel received unexpected CONTROL message, sid = "
-          << id_s_.stream_id_int();
+          << id_n_.stream_id_int();
       return;
     }
     if (ParseDataChannelOpenAckMessage(payload)) {
       // We can send unordered as soon as we receive the ACK message.
       handshake_state_ = kHandshakeReady;
       RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
-                       << id_s_.stream_id_int();
+                       << id_n_.stream_id_int();
     } else {
       RTC_LOG(LS_WARNING)
           << "DataChannel failed to parse OPEN_ACK message, sid = "
-          << id_s_.stream_id_int();
+          << id_n_.stream_id_int();
     }
     return;
   }
@@ -428,7 +653,7 @@
              type == DataMessageType::kText);
 
   RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
-                       << id_s_.stream_id_int();
+                       << id_n_.stream_id_int();
   // We can send unordered as soon as we receive any DATA message since the
   // remote side must have received the OPEN (and old clients do not send
   // OPEN_ACK).
@@ -459,7 +684,7 @@
 }
 
 void SctpDataChannel::OnTransportReady() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
 
   // TODO(bugs.webrtc.org/11547): The transport is configured inside
   // `PeerConnection::SetupDataChannelTransport_n`, which results in
@@ -472,6 +697,7 @@
   // be on for the below `Send*` calls, which currently do a BlockingCall
   // from the signaling thread to the network thread.
   RTC_DCHECK(connected_to_transport_);
+  RTC_DCHECK(id_n_.HasValue());
 
   SendQueuedControlMessages();
   SendQueuedDataMessages();
@@ -480,7 +706,7 @@
 }
 
 void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
 
   if (state_ == kClosed) {
     return;
@@ -501,13 +727,14 @@
 
 void SctpDataChannel::CloseAbruptlyWithDataChannelFailure(
     const std::string& message) {
+  RTC_DCHECK_RUN_ON(network_thread_);
   RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message);
   error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE);
   CloseAbruptlyWithError(std::move(error));
 }
 
+// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::UpdateState() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   // UpdateState determines what to do from a few state variables. Include
   // all conditions required for each state transition here for
   // clarity. OnTransportReady(true) will send any queued data and then invoke
@@ -535,7 +762,7 @@
           DeliverQueuedReceivedData();
         }
       } else {
-        RTC_DCHECK(!id_s_.HasValue());
+        RTC_DCHECK(!id_n_.HasValue());
       }
       break;
     }
@@ -551,11 +778,9 @@
           // to complete; after calling RemoveSctpDataStream,
           // OnClosingProcedureComplete will end up called asynchronously
           // afterwards.
-          if (!started_closing_procedure_ && id_s_.HasValue()) {
+          if (!started_closing_procedure_ && id_n_.HasValue()) {
             started_closing_procedure_ = true;
-            network_thread_->BlockingCall([c = controller_.get(), sid = id_s_] {
-              c->RemoveSctpDataStream(sid);
-            });
+            controller_->RemoveSctpDataStream(id_n_);
           }
         }
       } else {
@@ -572,8 +797,8 @@
   }
 }
 
+// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::SetState(DataState state) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (state_ == state) {
     return;
   }
@@ -587,8 +812,8 @@
     controller_->OnChannelStateChanged(this, state_);
 }
 
+// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::DeliverQueuedReceivedData() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (!observer_) {
     return;
   }
@@ -601,8 +826,8 @@
   }
 }
 
+// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::SendQueuedDataMessages() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (queued_send_data_.Empty()) {
     return;
   }
@@ -619,9 +844,9 @@
   }
 }
 
+// RTC_RUN_ON(network_thread_).
 bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
                                       bool queue_if_blocked) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   SendDataParams send_params;
   if (!controller_) {
     return false;
@@ -641,7 +866,7 @@
   send_params.type =
       buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
 
-  RTCError error = controller_->SendData(id_s_, send_params, buffer.data);
+  RTCError error = controller_->SendData(id_n_, send_params, buffer.data);
 
   if (error.ok()) {
     ++messages_sent_;
@@ -669,8 +894,8 @@
   return false;
 }
 
+// RTC_RUN_ON(network_thread_).
 bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   size_t start_buffered_amount = queued_send_data_.byte_count();
   if (start_buffered_amount + buffer.size() >
       DataChannelInterface::MaxSendQueueSize()) {
@@ -681,8 +906,8 @@
   return true;
 }
 
+// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::SendQueuedControlMessages() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   PacketQueue control_packets;
   control_packets.Swap(&queued_control_data_);
 
@@ -692,10 +917,10 @@
   }
 }
 
+// RTC_RUN_ON(network_thread_).
 bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   RTC_DCHECK(connected_to_transport_);
-  RTC_DCHECK(id_s_.HasValue());
+  RTC_DCHECK(id_n_.HasValue());
   RTC_DCHECK(controller_);
 
   bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
@@ -708,10 +933,10 @@
   send_params.ordered = ordered_ || is_open_message;
   send_params.type = DataMessageType::kControl;
 
-  RTCError err = controller_->SendData(id_s_, send_params, buffer);
+  RTCError err = controller_->SendData(id_n_, send_params, buffer);
   if (err.ok()) {
     RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel "
-                         << id_s_.stream_id_int();
+                         << id_n_.stream_id_int();
 
     if (handshake_state_ == kHandshakeShouldSendAck) {
       handshake_state_ = kHandshakeReady;
@@ -735,10 +960,4 @@
   g_unique_id = new_value;
 }
 
-SctpDataChannel* DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
-    DataChannelInterface* channel) {
-  return static_cast<SctpDataChannel*>(
-      static_cast<DataChannelProxy*>(channel)->internal());
-}
-
 }  // namespace webrtc