Revert "[DataChannel] Send and receive packets on the network thread."

This reverts commit fe53fec24e02d2d644220f913c3f9ae596bbb2d9.

Reason for revert: Speculative revert, may be breaking downstream project

Original change's description:
> [DataChannel] Send and receive packets on the network thread.
>
> This updates sctp channels, including work that happens between the
> data channel controller and the transport, to run on the network
> thread. Previously all network traffic related to data channels was
> routed through the signaling thread before going to either the network
> thread or the caller's thread (e.g. js thread in chrome). Now the
> calls can go straight from the network thread to the JS thread with
> enabling a special flag on the observer (see below) and similarly
> calls to send data, involve 2 threads instead of 3.
>
> * Custom data channel observer adapter implementation that
>   maintains compatibility with existing observer implementations in
>   that notifications are delivered on the signaling thread.
>   The adapter can be explicitly disabled for implementations that
>   want to optimize the callback path and promise to not block the
>   network thread.
> * Remove the signaling thread copy of data channels in the controller.
> * Remove several PostTask operations that were needed to keep things
>   in sync (but the need has gone away).
> * Update tests for the controller to consistently call
>   TeardownDataChannelTransport_n to match with production.
> * Update stats collectors (current and legacy) to fetch the data
>   channel stats on the network thread where they're maintained.
> * Remove the AsyncChannelCloseTeardown test since the async teardown
>   step has gone away.
> * Remove `sid_s` in the channel code since we only need the network
>   state now.
> * For the custom observer support (with and without data adapter) and
>   maintain compatibility with existing implementations, added a new
>   proxy macro that allows an implementation to selectively provide
>   its own implementation without being proxied. This is used for
>   registering/unregistering a data channel observer.
> * Update the data channel proxy to map most methods to the network
>   thread, avoiding the interim jump to the signaling thread.
> * Update a plethora of thread checkers from signaling to network.
>
> Bug: webrtc:11547
> Change-Id: Ib4cff1482e31c46008e187189a79e967389bc518
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299142
> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> Reviewed-by: Henrik Boström <hbos@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#39760}

Bug: webrtc:11547
Change-Id: Id0d65594bf727ccea5c49093c942b09714d101ad
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300341
Auto-Submit: Andrey Logvin <landrey@webrtc.org>
Owners-Override: Andrey Logvin <landrey@webrtc.org>
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39764}
diff --git a/api/data_channel_interface.h b/api/data_channel_interface.h
index 35ef8e4..4f74918 100644
--- a/api/data_channel_interface.h
+++ b/api/data_channel_interface.h
@@ -100,17 +100,6 @@
   // The data channel's buffered_amount has changed.
   virtual void OnBufferedAmountChange(uint64_t sent_data_size) {}
 
-  // Override this to get callbacks directly on the network thread.
-  // An implementation that does that must not block the network thread
-  // but rather only use the callback to trigger asynchronous processing
-  // elsewhere as a result of the notification.
-  // The default return value, `false`, means that notifications will be
-  // delivered on the signaling thread associated with the peerconnection
-  // instance.
-  // TODO(webrtc:11547): Eventually all DataChannelObserver implementations
-  // should be called on the network thread and this method removed.
-  virtual bool IsOkToCallOnTheNetworkThread() { return false; }
-
  protected:
   virtual ~DataChannelObserver() = default;
 };
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index 20d5fe9..36e8be1 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -29,15 +29,8 @@
 }
 
 bool DataChannelController::HasDataChannelsForTest() const {
-  auto has_channels = [&] {
-    RTC_DCHECK_RUN_ON(network_thread());
-    return !sctp_data_channels_n_.empty();
-  };
-
-  if (network_thread()->IsCurrent())
-    return has_channels();
-
-  return network_thread()->BlockingCall(std::move(has_channels));
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return !sctp_data_channels_.empty();
 }
 
 bool DataChannelController::HasUsedDataChannels() const {
@@ -73,15 +66,11 @@
 void DataChannelController::OnChannelStateChanged(
     SctpDataChannel* channel,
     DataChannelInterface::DataState state) {
-  RTC_DCHECK_RUN_ON(network_thread());
+  RTC_DCHECK_RUN_ON(signaling_thread());
   if (state == DataChannelInterface::DataState::kClosed)
     OnSctpDataChannelClosed(channel);
 
-  signaling_thread()->PostTask(
-      SafeTask(signaling_safety_.flag(),
-               [this, channel_id = channel->internal_id(), state = state] {
-                 pc_->OnSctpDataChannelStateChanged(channel_id, state);
-               }));
+  pc_->OnSctpDataChannelStateChanged(channel->internal_id(), state);
 }
 
 void DataChannelController::OnDataReceived(
@@ -93,22 +82,27 @@
   if (HandleOpenMessage_n(channel_id, type, buffer))
     return;
 
-  auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
-    return c->sid_n().stream_id_int() == channel_id;
-  });
-
-  if (it != sctp_data_channels_n_.end())
-    (*it)->OnDataReceived(type, buffer);
+  signaling_thread()->PostTask(
+      SafeTask(signaling_safety_.flag(), [this, channel_id, type, buffer] {
+        RTC_DCHECK_RUN_ON(signaling_thread());
+        // TODO(bugs.webrtc.org/11547): The data being received should be
+        // delivered on the network thread.
+        auto it = FindChannel(StreamId(channel_id));
+        if (it != sctp_data_channels_.end())
+          (*it)->OnDataReceived(type, buffer);
+      }));
 }
 
 void DataChannelController::OnChannelClosing(int channel_id) {
   RTC_DCHECK_RUN_ON(network_thread());
-  auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
-    return c->sid_n().stream_id_int() == channel_id;
-  });
-
-  if (it != sctp_data_channels_n_.end())
-    (*it)->OnClosingProcedureStartedRemotely();
+  signaling_thread()->PostTask(
+      SafeTask(signaling_safety_.flag(), [this, channel_id] {
+        RTC_DCHECK_RUN_ON(signaling_thread());
+        // TODO(bugs.webrtc.org/11547): Should run on the network thread.
+        auto it = FindChannel(StreamId(channel_id));
+        if (it != sctp_data_channels_.end())
+          (*it)->OnClosingProcedureStartedRemotely();
+      }));
 }
 
 void DataChannelController::OnChannelClosed(int channel_id) {
@@ -118,44 +112,48 @@
   auto it = absl::c_find_if(sctp_data_channels_n_,
                             [&](const auto& c) { return c->sid_n() == sid; });
 
-  if (it != sctp_data_channels_n_.end()) {
-    rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
+  if (it != sctp_data_channels_n_.end())
     sctp_data_channels_n_.erase(it);
-    channel->OnClosingProcedureComplete();
-  }
+
+  signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    auto it = FindChannel(sid);
+    // Remove the channel from our list, close it and free up resources.
+    if (it != sctp_data_channels_.end()) {
+      rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
+      // Note: this causes OnSctpDataChannelClosed() to not do anything
+      // when called from within `OnClosingProcedureComplete`.
+      sctp_data_channels_.erase(it);
+
+      channel->OnClosingProcedureComplete();
+    }
+  }));
 }
 
 void DataChannelController::OnReadyToSend() {
   RTC_DCHECK_RUN_ON(network_thread());
-  auto copy = sctp_data_channels_n_;
-  for (const auto& channel : copy) {
-    if (channel->sid_n().HasValue()) {
+  signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    auto copy = sctp_data_channels_;
+    for (const auto& channel : copy)
       channel->OnTransportReady();
-    } else {
-      // This happens for role==SSL_SERVER channels when we get notified by
-      // the transport *before* the SDP code calls `AllocateSctpSids` to
-      // trigger assignment of sids. In this case OnTransportReady() will be
-      // called from within `AllocateSctpSids` below.
-      RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel.";
-    }
-  }
+  }));
 }
 
 void DataChannelController::OnTransportClosed(RTCError error) {
   RTC_DCHECK_RUN_ON(network_thread());
-  // This loop will close all data channels and trigger a callback to
-  // `OnSctpDataChannelClosed` which will modify `sctp_data_channels_n_`, so
-  // we create a local copy while we do the fan-out.
-  auto copy = sctp_data_channels_n_;
-  for (const auto& channel : copy)
-    channel->OnTransportChannelClosed(error);
+  signaling_thread()->PostTask(
+      SafeTask(signaling_safety_.flag(), [this, error] {
+        RTC_DCHECK_RUN_ON(signaling_thread());
+        OnTransportChannelClosed(error);
+      }));
 }
 
 void DataChannelController::SetupDataChannelTransport_n() {
   RTC_DCHECK_RUN_ON(network_thread());
 
   // There's a new data channel transport.  This needs to be signaled to the
-  // `sctp_data_channels_n_` so that they can reopen and reconnect.  This is
+  // `sctp_data_channels_` so that they can reopen and reconnect.  This is
   // necessary when bundling is applied.
   NotifyDataChannelsOfTransportCreated();
 }
@@ -167,12 +165,11 @@
 
 void DataChannelController::TeardownDataChannelTransport_n() {
   RTC_DCHECK_RUN_ON(network_thread());
-  if (data_channel_transport_) {
-    data_channel_transport_->SetDataSink(nullptr);
-    set_data_channel_transport(nullptr);
+  if (data_channel_transport()) {
+    data_channel_transport()->SetDataSink(nullptr);
   }
+  set_data_channel_transport(nullptr);
   sctp_data_channels_n_.clear();
-  weak_factory_.InvalidateWeakPtrs();
 }
 
 void DataChannelController::OnTransportChanged(
@@ -188,7 +185,7 @@
       new_data_channel_transport->SetDataSink(this);
 
       // There's a new data channel transport.  This needs to be signaled to the
-      // `sctp_data_channels_n_` so that they can reopen and reconnect.  This is
+      // `sctp_data_channels_` so that they can reopen and reconnect.  This is
       // necessary when bundling is applied.
       NotifyDataChannelsOfTransportCreated();
     }
@@ -197,10 +194,10 @@
 
 std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
     const {
-  RTC_DCHECK_RUN_ON(network_thread());
+  RTC_DCHECK_RUN_ON(signaling_thread());
   std::vector<DataChannelStats> stats;
-  stats.reserve(sctp_data_channels_n_.size());
-  for (const auto& channel : sctp_data_channels_n_)
+  stats.reserve(sctp_data_channels_.size());
+  for (const auto& channel : sctp_data_channels_)
     stats.push_back(channel->GetStats());
   return stats;
 }
@@ -222,38 +219,28 @@
                         << channel_id;
   } else {
     config.open_handshake_role = InternalDataChannelInit::kAcker;
-    auto channel_or_error = CreateDataChannel(label, config);
-    if (channel_or_error.ok()) {
-      signaling_thread()->PostTask(SafeTask(
-          signaling_safety_.flag(),
-          [this, channel = channel_or_error.MoveValue(),
-           ready_to_send = data_channel_transport_->IsReadyToSend()] {
-            RTC_DCHECK_RUN_ON(signaling_thread());
-            OnDataChannelOpenMessage(std::move(channel), ready_to_send);
-          }));
-    } else {
-      RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
-                        << ToString(channel_or_error.error().type());
-    }
+    signaling_thread()->PostTask(
+        SafeTask(signaling_safety_.flag(),
+                 [this, label = std::move(label), config = std::move(config)] {
+                   RTC_DCHECK_RUN_ON(signaling_thread());
+                   OnDataChannelOpenMessage(label, config);
+                 }));
   }
   return true;
 }
 
 void DataChannelController::OnDataChannelOpenMessage(
-    rtc::scoped_refptr<SctpDataChannel> channel,
-    bool ready_to_send) {
-  has_used_data_channels_ = true;
-  auto proxy = SctpDataChannel::CreateProxy(channel);
-
-  pc_->Observer()->OnDataChannel(proxy);
-  pc_->NoteDataAddedEvent();
-
-  if (ready_to_send) {
-    network_thread()->PostTask([channel = std::move(channel)] {
-      if (channel->state() != DataChannelInterface::DataState::kClosed)
-        channel->OnTransportReady();
-    });
+    const std::string& label,
+    const InternalDataChannelInit& config) {
+  auto channel_or_error = InternalCreateDataChannelWithProxy(label, config);
+  if (!channel_or_error.ok()) {
+    RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
+                      << ToString(channel_or_error.error().type());
+    return;
   }
+
+  pc_->Observer()->OnDataChannel(channel_or_error.MoveValue());
+  pc_->NoteDataAddedEvent();
 }
 
 // RTC_RUN_ON(network_thread())
@@ -282,31 +269,6 @@
   return RTCError::OK();
 }
 
-// RTC_RUN_ON(network_thread())
-RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>>
-DataChannelController::CreateDataChannel(const std::string& label,
-                                         InternalDataChannelInit& config) {
-  StreamId sid(config.id);
-  RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role);
-  if (!err.ok())
-    return err;
-
-  // In case `sid` has changed. Update `config` accordingly.
-  config.id = sid.stream_id_int();
-
-  rtc::scoped_refptr<SctpDataChannel> channel = SctpDataChannel::Create(
-      weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr,
-      config, signaling_thread(), network_thread());
-  RTC_DCHECK(channel);
-  sctp_data_channels_n_.push_back(channel);
-
-  // If we have an id already, notify the transport.
-  if (sid.HasValue())
-    AddSctpDataStream(sid);
-
-  return channel;
-}
-
 RTCErrorOr<rtc::scoped_refptr<DataChannelInterface>>
 DataChannelController::InternalCreateDataChannelWithProxy(
     const std::string& label,
@@ -321,25 +283,29 @@
   bool ready_to_send = false;
   InternalDataChannelInit new_config = config;
   StreamId sid(new_config.id);
+  auto weak_ptr = weak_factory_.GetWeakPtr();
+  RTC_DCHECK(weak_ptr);  // Associate with current thread.
   auto ret = network_thread()->BlockingCall(
       [&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> {
         RTC_DCHECK_RUN_ON(network_thread());
-        auto channel = CreateDataChannel(label, new_config);
-        if (!channel.ok())
-          return channel;
+        RTCError err = ReserveOrAllocateSid(sid, new_config.fallback_ssl_role);
+        if (!err.ok())
+          return err;
+
+        // In case `sid` has changed. Update `new_config` accordingly.
+        new_config.id = sid.stream_id_int();
         ready_to_send =
             data_channel_transport_ && data_channel_transport_->IsReadyToSend();
-        if (ready_to_send) {
-          // If the transport is ready to send because the initial channel
-          // ready signal may have been sent before the DataChannel creation.
-          // This has to be done async because the upper layer objects (e.g.
-          // Chrome glue and WebKit) are not wired up properly until after
-          // `InternalCreateDataChannelWithProxy` returns.
-          network_thread()->PostTask([channel = channel.value()] {
-            if (channel->state() != DataChannelInterface::DataState::kClosed)
-              channel->OnTransportReady();
-          });
-        }
+
+        rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
+            std::move(weak_ptr), label, data_channel_transport_ != nullptr,
+            new_config, signaling_thread(), network_thread()));
+        RTC_DCHECK(channel);
+        sctp_data_channels_n_.push_back(channel);
+
+        // If we have an id already, notify the transport.
+        if (sid.HasValue())
+          AddSctpDataStream(sid);
 
         return channel;
       });
@@ -347,71 +313,114 @@
   if (!ret.ok())
     return ret.MoveError();
 
+  if (ready_to_send) {
+    // Checks if the transport is ready to send because the initial channel
+    // ready signal may have been sent before the DataChannel creation.
+    // This has to be done async because the upper layer objects (e.g.
+    // Chrome glue and WebKit) are not wired up properly until after this
+    // function returns.
+    signaling_thread()->PostTask(
+        SafeTask(signaling_safety_.flag(), [channel = ret.value()] {
+          if (channel->state() != DataChannelInterface::DataState::kClosed)
+            channel->OnTransportReady();
+        }));
+  }
+
+  sctp_data_channels_.push_back(ret.value());
   has_used_data_channels_ = true;
   return SctpDataChannel::CreateProxy(ret.MoveValue());
 }
 
 void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
-  RTC_DCHECK_RUN_ON(network_thread());
-
-  const bool ready_to_send =
-      data_channel_transport_ && data_channel_transport_->IsReadyToSend();
+  RTC_DCHECK_RUN_ON(signaling_thread());
 
   std::vector<std::pair<SctpDataChannel*, StreamId>> channels_to_update;
   std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
-  for (auto it = sctp_data_channels_n_.begin();
-       it != sctp_data_channels_n_.end();) {
-    if (!(*it)->sid_n().HasValue()) {
-      StreamId sid = sid_allocator_.AllocateSid(role);
-      if (sid.HasValue()) {
-        (*it)->SetSctpSid_n(sid);
-        AddSctpDataStream(sid);
-        if (ready_to_send) {
-          RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send.";
-          (*it)->OnTransportReady();
+
+  network_thread()->BlockingCall([&] {
+    RTC_DCHECK_RUN_ON(network_thread());
+    for (auto it = sctp_data_channels_n_.begin();
+         it != sctp_data_channels_n_.end();) {
+      if (!(*it)->sid_n().HasValue()) {
+        StreamId sid = sid_allocator_.AllocateSid(role);
+        if (sid.HasValue()) {
+          (*it)->SetSctpSid_n(sid);
+          AddSctpDataStream(sid);
+          channels_to_update.push_back(std::make_pair((*it).get(), sid));
+        } else {
+          channels_to_close.push_back(std::move(*it));
+          it = sctp_data_channels_n_.erase(it);
+          continue;
         }
-        channels_to_update.push_back(std::make_pair((*it).get(), sid));
-      } else {
-        channels_to_close.push_back(std::move(*it));
-        it = sctp_data_channels_n_.erase(it);
-        continue;
       }
+      ++it;
     }
-    ++it;
-  }
+  });
 
   // Since closing modifies the list of channels, we have to do the actual
   // closing outside the loop.
   for (const auto& channel : channels_to_close) {
     channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
+    // The channel should now have been removed from sctp_data_channels_.
+    RTC_DCHECK(absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+                 return c.get() == channel.get();
+               }) == sctp_data_channels_.end());
+  }
+
+  for (auto& pair : channels_to_update) {
+    auto it = absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+      return c.get() == pair.first;
+    });
+    RTC_DCHECK(it != sctp_data_channels_.end());
+    (*it)->SetSctpSid_s(pair.second);
   }
 }
 
 void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
-  RTC_DCHECK_RUN_ON(network_thread());
-  // After the closing procedure is done, it's safe to use this ID for
-  // another data channel.
-  if (channel->sid_n().HasValue()) {
-    sid_allocator_.ReleaseSid(channel->sid_n());
+  RTC_DCHECK_RUN_ON(signaling_thread());
+
+  network_thread()->BlockingCall([&] {
+    RTC_DCHECK_RUN_ON(network_thread());
+    // After the closing procedure is done, it's safe to use this ID for
+    // another data channel.
+    if (channel->sid_n().HasValue()) {
+      sid_allocator_.ReleaseSid(channel->sid_n());
+    }
+
+    auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
+      return c.get() == channel;
+    });
+
+    if (it != sctp_data_channels_n_.end())
+      sctp_data_channels_n_.erase(it);
+  });
+
+  for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
+       ++it) {
+    if (it->get() == channel) {
+      // Since this method is triggered by a signal from the DataChannel,
+      // we can't free it directly here; we need to free it asynchronously.
+      rtc::scoped_refptr<SctpDataChannel> release = std::move(*it);
+      sctp_data_channels_.erase(it);
+      signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(),
+                                            [release = std::move(release)] {}));
+      return;
+    }
   }
-  auto it = absl::c_find_if(sctp_data_channels_n_,
-                            [&](const auto& c) { return c.get() == channel; });
-  if (it != sctp_data_channels_n_.end())
-    sctp_data_channels_n_.erase(it);
 }
 
 void DataChannelController::OnTransportChannelClosed(RTCError error) {
-  RTC_DCHECK_RUN_ON(network_thread());
+  RTC_DCHECK_RUN_ON(signaling_thread());
   // Use a temporary copy of the SCTP DataChannel list because the
   // DataChannel may callback to us and try to modify the list.
   // TODO(tommi): `OnTransportChannelClosed` is called from
   // `SdpOfferAnswerHandler::DestroyDataChannelTransport` just before
   // `TeardownDataChannelTransport_n` is called (but on the network thread) from
-  // the same function. We can now get rid of this function
-  // (OnTransportChannelClosed) and run this loop from within the
-  // TeardownDataChannelTransport_n callback.
+  // the same function. Once `sctp_data_channels_` moves to the network thread,
+  // we can get rid of this function (OnTransportChannelClosed) and run this
+  // loop from within the TeardownDataChannelTransport_n callback.
   std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
-  temp_sctp_dcs.swap(sctp_data_channels_n_);
+  temp_sctp_dcs.swap(sctp_data_channels_);
   for (const auto& channel : temp_sctp_dcs) {
     channel->OnTransportChannelClosed(error);
   }
@@ -435,10 +444,16 @@
     StreamId sid,
     const SendDataParams& params,
     const rtc::CopyOnWriteBuffer& payload) {
-  RTC_DCHECK_RUN_ON(network_thread());
+  // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
+  // thread instead. Remove the BlockingCall() below and move associated state
+  // to the network thread.
+  RTC_DCHECK_RUN_ON(signaling_thread());
   RTC_DCHECK(data_channel_transport());
-  return data_channel_transport()->SendData(sid.stream_id_int(), params,
-                                            payload);
+
+  return network_thread()->BlockingCall([this, sid, params, payload] {
+    return data_channel_transport()->SendData(sid.stream_id_int(), params,
+                                              payload);
+  });
 }
 
 void DataChannelController::NotifyDataChannelsOfTransportCreated() {
@@ -448,8 +463,22 @@
   for (const auto& channel : sctp_data_channels_n_) {
     if (channel->sid_n().HasValue())
       AddSctpDataStream(channel->sid_n());
-    channel->OnTransportChannelCreated();
   }
+
+  signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    for (const auto& channel : sctp_data_channels_) {
+      channel->OnTransportChannelCreated();
+    }
+  }));
+}
+
+std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator
+DataChannelController::FindChannel(StreamId stream_id) {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+    return c->sid_s() == stream_id;
+  });
 }
 
 rtc::Thread* DataChannelController::network_thread() const {
diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h
index 074b1fe..fa6c13e 100644
--- a/pc/data_channel_controller.h
+++ b/pc/data_channel_controller.h
@@ -107,11 +107,6 @@
   rtc::Thread* signaling_thread() const;
 
  private:
-  // Creates a new SctpDataChannel object on the network thread.
-  RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> CreateDataChannel(
-      const std::string& label,
-      InternalDataChannelInit& config) RTC_RUN_ON(network_thread());
-
   // Parses and handles open messages.  Returns true if the message is an open
   // message and should be considered to be handled, false otherwise.
   bool HandleOpenMessage_n(int channel_id,
@@ -119,8 +114,8 @@
                            const rtc::CopyOnWriteBuffer& buffer)
       RTC_RUN_ON(network_thread());
   // Called when a valid data channel OPEN message is received.
-  void OnDataChannelOpenMessage(rtc::scoped_refptr<SctpDataChannel> channel,
-                                bool ready_to_send)
+  void OnDataChannelOpenMessage(const std::string& label,
+                                const InternalDataChannelInit& config)
       RTC_RUN_ON(signaling_thread());
 
   // Accepts a `StreamId` which may be pre-negotiated or unassigned. For
@@ -144,6 +139,9 @@
   // (calls OnTransportChannelCreated on the signaling thread).
   void NotifyDataChannelsOfTransportCreated();
 
+  std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator FindChannel(
+      StreamId stream_id);
+
   // Plugin transport used for data channels.  Pointer may be accessed and
   // checked from any thread, but the object may only be touched on the
   // network thread.
@@ -151,16 +149,21 @@
   // thread.
   DataChannelTransportInterface* data_channel_transport_ = nullptr;
   SctpSidAllocator sid_allocator_ RTC_GUARDED_BY(network_thread());
+  std::vector<rtc::scoped_refptr<SctpDataChannel>> sctp_data_channels_
+      RTC_GUARDED_BY(signaling_thread());
+  // TODO(bugs.webrtc.org/11547): This vector will eventually take over from
+  // `sctp_data_channels_`. While we're migrating away from thread hops
+  // between the signaling and network threads, we need both, so this is
+  // a temporary situation.
   std::vector<rtc::scoped_refptr<SctpDataChannel>> sctp_data_channels_n_
       RTC_GUARDED_BY(network_thread());
   bool has_used_data_channels_ RTC_GUARDED_BY(signaling_thread()) = false;
 
   // Owning PeerConnection.
   PeerConnectionInternal* const pc_;
-  // The weak pointers must be dereferenced and invalidated on the network
+  // The weak pointers must be dereferenced and invalidated on the signalling
   // thread only.
-  rtc::WeakPtrFactory<DataChannelController> weak_factory_
-      RTC_GUARDED_BY(network_thread()){this};
+  rtc::WeakPtrFactory<DataChannelController> weak_factory_{this};
   ScopedTaskSafety signaling_safety_;
 };
 
diff --git a/pc/data_channel_controller_unittest.cc b/pc/data_channel_controller_unittest.cc
index fd3deae..9b66fac 100644
--- a/pc/data_channel_controller_unittest.cc
+++ b/pc/data_channel_controller_unittest.cc
@@ -131,6 +131,44 @@
   channel->Close();
 }
 
+TEST_F(DataChannelControllerTest, AsyncChannelCloseTeardown) {
+  DataChannelControllerForTest dcc(pc_.get());
+  auto ret = dcc.InternalCreateDataChannelWithProxy(
+      "label", InternalDataChannelInit(DataChannelInit()));
+  ASSERT_TRUE(ret.ok());
+  auto channel = ret.MoveValue();
+  SctpDataChannel* inner_channel =
+      DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
+          channel.get());
+  // Grab a reference for testing purposes.
+  inner_channel->AddRef();
+
+  channel = nullptr;  // dcc still holds a reference to `channel`.
+  EXPECT_TRUE(dcc.HasDataChannelsForTest());
+
+  // Trigger a Close() for the channel. This will send events back to dcc,
+  // eventually reaching `OnSctpDataChannelClosed` where dcc removes
+  // the channel from the internal list of data channels, but does not release
+  // the reference synchronously since that reference might be the last one.
+  inner_channel->Close();
+  // Now there should be no tracked data channels.
+  EXPECT_FALSE(dcc.HasDataChannelsForTest());
+  // But there should be an async operation queued that still holds a reference.
+  // That means that the test reference, must not be the last one.
+  ASSERT_NE(inner_channel->Release(),
+            rtc::RefCountReleaseStatus::kDroppedLastRef);
+  // Grab a reference again (using the pointer is safe since the object still
+  // exists and we control the single-threaded environment manually).
+  inner_channel->AddRef();
+  // Now run the queued up async operations on the signaling (current) thread.
+  // This time, the reference formerly owned by dcc, should be release and the
+  // truly last reference is now held by the test.
+  run_loop_.Flush();
+  // Check that this is the last reference.
+  EXPECT_EQ(inner_channel->Release(),
+            rtc::RefCountReleaseStatus::kDroppedLastRef);
+}
+
 // Allocate the maximum number of data channels and then one more.
 // The last allocation should fail.
 TEST_F(DataChannelControllerTest, MaxChannels) {
diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc
index 2582561..fe75d38 100644
--- a/pc/data_channel_unittest.cc
+++ b/pc/data_channel_unittest.cc
@@ -40,19 +40,18 @@
 
 class FakeDataChannelObserver : public DataChannelObserver {
  public:
-  FakeDataChannelObserver() {
-    // This implementation relies on the SctpDataChannel::ObserverAdapter
-    // implementation to post events to the signaling thread.
-    RTC_DCHECK(!IsOkToCallOnTheNetworkThread());
-  }
+  FakeDataChannelObserver()
+      : messages_received_(0),
+        on_state_change_count_(0),
+        on_buffered_amount_change_count_(0) {}
 
-  void OnStateChange() override { ++on_state_change_count_; }
+  void OnStateChange() { ++on_state_change_count_; }
 
-  void OnBufferedAmountChange(uint64_t previous_amount) override {
+  void OnBufferedAmountChange(uint64_t previous_amount) {
     ++on_buffered_amount_change_count_;
   }
 
-  void OnMessage(const DataBuffer& buffer) override { ++messages_received_; }
+  void OnMessage(const DataBuffer& buffer) { ++messages_received_; }
 
   size_t messages_received() const { return messages_received_; }
 
@@ -69,9 +68,9 @@
   }
 
  private:
-  size_t messages_received_ = 0u;
-  size_t on_state_change_count_ = 0u;
-  size_t on_buffered_amount_change_count_ = 0u;
+  size_t messages_received_;
+  size_t on_state_change_count_;
+  size_t on_buffered_amount_change_count_;
 };
 
 class SctpDataChannelTest : public ::testing::Test {
@@ -94,17 +93,11 @@
 
   void SetChannelReady() {
     controller_->set_transport_available(true);
-    StreamId sid(0);
-    network_thread_.BlockingCall([&]() {
-      RTC_DCHECK_RUN_ON(&network_thread_);
-      if (!inner_channel_->sid_n().HasValue()) {
-        inner_channel_->SetSctpSid_n(sid);
-        controller_->AddSctpDataStream(sid);
-      }
-      inner_channel_->OnTransportChannelCreated();
-    });
+    inner_channel_->OnTransportChannelCreated();
+    if (!inner_channel_->sid_s().HasValue()) {
+      SetChannelSid(inner_channel_, StreamId(0));
+    }
     controller_->set_ready_to_send(true);
-    run_loop_.Flush();
   }
 
   // TODO(bugs.webrtc.org/11547): This mirrors what the DataChannelController
@@ -115,10 +108,9 @@
   void SetChannelSid(const rtc::scoped_refptr<SctpDataChannel>& channel,
                      StreamId sid) {
     RTC_DCHECK(sid.HasValue());
-    network_thread_.BlockingCall([&]() {
-      channel->SetSctpSid_n(sid);
-      controller_->AddSctpDataStream(sid);
-    });
+    network_thread_.BlockingCall(
+        [&]() { controller_->AddSctpDataStream(sid); });
+    channel->SetSctpSid_s(sid);
   }
 
   void AddObserver() {
@@ -154,13 +146,11 @@
 
   // Check the non-const part of the configuration.
   EXPECT_EQ(channel_->id(), init_.id);
-  network_thread_.BlockingCall(
-      [&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId()); });
+  EXPECT_EQ(inner_channel_->sid_s(), StreamId());
 
   SetChannelReady();
   EXPECT_EQ(channel_->id(), 0);
-  network_thread_.BlockingCall(
-      [&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId(0)); });
+  EXPECT_EQ(inner_channel_->sid_s(), StreamId(0));
 }
 
 // Verifies that the data channel is connected to the transport after creation.
@@ -168,15 +158,13 @@
   controller_->set_transport_available(true);
   rtc::scoped_refptr<SctpDataChannel> dc =
       controller_->CreateDataChannel("test1", init_);
-  EXPECT_TRUE(controller_->IsConnected(dc.get()));
 
+  EXPECT_TRUE(controller_->IsConnected(dc.get()));
   // The sid is not set yet, so it should not have added the streams.
-  StreamId sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); });
-  EXPECT_FALSE(controller_->IsStreamAdded(sid));
+  EXPECT_FALSE(controller_->IsStreamAdded(dc->sid_s()));
 
   SetChannelSid(dc, StreamId(0));
-  sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); });
-  EXPECT_TRUE(controller_->IsStreamAdded(sid));
+  EXPECT_TRUE(controller_->IsStreamAdded(dc->sid_s()));
 }
 
 // Tests the state of the data channel.
@@ -195,7 +183,7 @@
   channel_->Close();
   // The (simulated) transport close notifications runs on the network thread
   // and posts a completion notification to the signaling (current) thread.
-  // Allow that operation to complete before checking the state.
+  // Allow that ooperation to complete before checking the state.
   run_loop_.Flush();
   EXPECT_EQ(DataChannelInterface::kClosed, channel_->state());
   EXPECT_EQ(observer_->on_state_change_count(), 3u);
@@ -213,7 +201,6 @@
   EXPECT_TRUE(channel_->Send(buffer));
   size_t successful_send_count = 1;
 
-  run_loop_.Flush();
   EXPECT_EQ(0U, channel_->buffered_amount());
   EXPECT_EQ(successful_send_count,
             observer_->on_buffered_amount_change_count());
@@ -230,7 +217,6 @@
             observer_->on_buffered_amount_change_count());
 
   controller_->set_send_blocked(false);
-  run_loop_.Flush();
   successful_send_count += number_of_packets;
   EXPECT_EQ(0U, channel_->buffered_amount());
   EXPECT_EQ(successful_send_count,
@@ -351,9 +337,10 @@
   SetChannelReady();
   InternalDataChannelInit init;
   init.id = 1;
-  auto dc = webrtc::SctpDataChannel::CreateProxy(
-      controller_->CreateDataChannel("test1", init));
-  EXPECT_EQ(DataChannelInterface::kOpen, dc->state());
+  rtc::scoped_refptr<SctpDataChannel> dc =
+      controller_->CreateDataChannel("test1", init);
+  EXPECT_EQ(DataChannelInterface::kConnecting, dc->state());
+  EXPECT_TRUE_WAIT(DataChannelInterface::kOpen == dc->state(), 1000);
 }
 
 // Tests that an unordered DataChannel sends data as ordered until the OPEN_ACK
@@ -365,23 +352,21 @@
   init.ordered = false;
   rtc::scoped_refptr<SctpDataChannel> dc =
       controller_->CreateDataChannel("test1", init);
-  auto proxy = webrtc::SctpDataChannel::CreateProxy(dc);
 
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
+  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
 
   // Sends a message and verifies it's ordered.
   DataBuffer buffer("some data");
-  ASSERT_TRUE(proxy->Send(buffer));
+  ASSERT_TRUE(dc->Send(buffer));
   EXPECT_TRUE(controller_->last_send_data_params().ordered);
 
   // Emulates receiving an OPEN_ACK message.
   rtc::CopyOnWriteBuffer payload;
   WriteDataChannelOpenAckMessage(&payload);
-  network_thread_.BlockingCall(
-      [&] { dc->OnDataReceived(DataMessageType::kControl, payload); });
+  dc->OnDataReceived(DataMessageType::kControl, payload);
 
   // Sends another message and verifies it's unordered.
-  ASSERT_TRUE(proxy->Send(buffer));
+  ASSERT_TRUE(dc->Send(buffer));
   EXPECT_FALSE(controller_->last_send_data_params().ordered);
 }
 
@@ -394,17 +379,15 @@
   init.ordered = false;
   rtc::scoped_refptr<SctpDataChannel> dc =
       controller_->CreateDataChannel("test1", init);
-  auto proxy = webrtc::SctpDataChannel::CreateProxy(dc);
 
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
+  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
 
   // Emulates receiving a DATA message.
   DataBuffer buffer("data");
-  network_thread_.BlockingCall(
-      [&] { dc->OnDataReceived(DataMessageType::kText, buffer.data); });
+  dc->OnDataReceived(DataMessageType::kText, buffer.data);
 
   // Sends a message and verifies it's unordered.
-  ASSERT_TRUE(proxy->Send(buffer));
+  ASSERT_TRUE(dc->Send(buffer));
   EXPECT_FALSE(controller_->last_send_data_params().ordered);
 }
 
@@ -457,10 +440,7 @@
   AddObserver();
 
   DataBuffer buffer("abcd");
-  network_thread_.BlockingCall([&] {
-    inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data);
-  });
-  run_loop_.Flush();
+  inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data);
   EXPECT_EQ(1U, observer_->messages_received());
 }
 
@@ -475,9 +455,8 @@
   SetChannelReady();
   rtc::scoped_refptr<SctpDataChannel> dc =
       controller_->CreateDataChannel("test1", config);
-  auto proxy = webrtc::SctpDataChannel::CreateProxy(dc);
 
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
+  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
   EXPECT_EQ(0, controller_->last_sid());
 }
 
@@ -501,10 +480,9 @@
   EXPECT_EQ(0U, channel_->bytes_received());
 
   // Receive three buffers while data channel isn't open.
-  network_thread_.BlockingCall([&] {
-    for (int i : {0, 1, 2})
-      inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data);
-  });
+  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[0].data);
+  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[1].data);
+  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[2].data);
   EXPECT_EQ(0U, observer_->messages_received());
   EXPECT_EQ(0U, channel_->messages_received());
   EXPECT_EQ(0U, channel_->bytes_received());
@@ -518,11 +496,9 @@
   EXPECT_EQ(bytes_received, channel_->bytes_received());
 
   // Receive three buffers while open.
-  network_thread_.BlockingCall([&] {
-    for (int i : {3, 4, 5})
-      inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data);
-  });
-  run_loop_.Flush();
+  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[3].data);
+  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[4].data);
+  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[5].data);
   bytes_received += buffers[3].size() + buffers[4].size() + buffers[5].size();
   EXPECT_EQ(6U, observer_->messages_received());
   EXPECT_EQ(6U, channel_->messages_received());
@@ -540,9 +516,8 @@
   SetChannelReady();
   rtc::scoped_refptr<SctpDataChannel> dc =
       controller_->CreateDataChannel("test1", config);
-  auto proxy = webrtc::SctpDataChannel::CreateProxy(dc);
 
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
+  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
 
   EXPECT_EQ(config.id, controller_->last_sid());
   EXPECT_EQ(DataMessageType::kControl,
@@ -579,8 +554,9 @@
     EXPECT_TRUE(channel_->Send(packet));
   }
 
-  // The sending buffer should be full, `Send()` returns false.
+  // The sending buffer shoul be full, send returns false.
   EXPECT_FALSE(channel_->Send(packet));
+
   EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state());
 }
 
@@ -604,12 +580,10 @@
   rtc::CopyOnWriteBuffer buffer(1024);
   memset(buffer.MutableData(), 0, buffer.size());
 
-  network_thread_.BlockingCall([&] {
-    // Receiving data without having an observer will overflow the buffer.
-    for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
-      inner_channel_->OnDataReceived(DataMessageType::kText, buffer);
-    }
-  });
+  // Receiving data without having an observer will overflow the buffer.
+  for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
+    inner_channel_->OnDataReceived(DataMessageType::kText, buffer);
+  }
   EXPECT_EQ(DataChannelInterface::kClosed, channel_->state());
   EXPECT_FALSE(channel_->error().ok());
   EXPECT_EQ(RTCErrorType::RESOURCE_EXHAUSTED, channel_->error().type());
@@ -630,8 +604,7 @@
 // Tests that a channel can be closed without being opened or assigned an sid.
 TEST_F(SctpDataChannelTest, NeverOpened) {
   controller_->set_transport_available(true);
-  network_thread_.BlockingCall(
-      [&] { inner_channel_->OnTransportChannelCreated(); });
+  inner_channel_->OnTransportChannelCreated();
   channel_->Close();
 }
 
@@ -661,8 +634,7 @@
   // transition to the "closed" state.
   RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, "");
   error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
-  network_thread_.BlockingCall(
-      [&] { inner_channel_->OnTransportChannelClosed(error); });
+  inner_channel_->OnTransportChannelClosed(error);
   controller_.reset(nullptr);
   EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(),
                  kDefaultTimeout);
@@ -682,8 +654,7 @@
   error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
   error.set_sctp_cause_code(
       static_cast<uint16_t>(cricket::SctpErrorCauseCode::kProtocolViolation));
-  network_thread_.BlockingCall(
-      [&] { inner_channel_->OnTransportChannelClosed(error); });
+  inner_channel_->OnTransportChannelClosed(error);
   controller_.reset(nullptr);
   EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(),
                  kDefaultTimeout);
diff --git a/pc/legacy_stats_collector.cc b/pc/legacy_stats_collector.cc
index 7ce85e4..6829e35 100644
--- a/pc/legacy_stats_collector.cc
+++ b/pc/legacy_stats_collector.cc
@@ -670,7 +670,7 @@
   // to fetch stats, then applies them on the signaling thread. See if we need
   // to do this synchronously or if updating the stats without blocking is safe.
   std::map<std::string, std::string> transport_names_by_mid =
-      ExtractSessionAndDataInfo();
+      ExtractSessionInfo();
 
   // TODO(tommi): All of these hop over to the worker thread to fetch
   // information.  We could post a task to run all of these and post
@@ -681,6 +681,7 @@
   ExtractBweInfo();
   ExtractMediaInfo(transport_names_by_mid);
   ExtractSenderInfo();
+  ExtractDataInfo();
   UpdateTrackReports();
 }
 
@@ -855,26 +856,19 @@
   return report;
 }
 
-std::map<std::string, std::string>
-LegacyStatsCollector::ExtractSessionAndDataInfo() {
-  TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionAndDataInfo");
+std::map<std::string, std::string> LegacyStatsCollector::ExtractSessionInfo() {
+  TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionInfo");
   RTC_DCHECK_RUN_ON(pc_->signaling_thread());
 
   SessionStats stats;
-  StatsCollection::Container data_report_collection;
   auto transceivers = pc_->GetTransceiversInternal();
   pc_->network_thread()->BlockingCall(
       [&, sctp_transport_name = pc_->sctp_transport_name(),
        sctp_mid = pc_->sctp_mid()]() mutable {
         stats = ExtractSessionInfo_n(
             transceivers, std::move(sctp_transport_name), std::move(sctp_mid));
-        StatsCollection data_reports;
-        ExtractDataInfo_n(&data_reports);
-        data_report_collection = data_reports.DetachCollection();
       });
 
-  reports_.MergeCollection(std::move(data_report_collection));
-
   ExtractSessionInfo_s(stats);
 
   return std::move(stats.transport_names_by_mid);
@@ -1298,8 +1292,8 @@
   }
 }
 
-void LegacyStatsCollector::ExtractDataInfo_n(StatsCollection* reports) {
-  RTC_DCHECK_RUN_ON(pc_->network_thread());
+void LegacyStatsCollector::ExtractDataInfo() {
+  RTC_DCHECK_RUN_ON(pc_->signaling_thread());
 
   rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
 
@@ -1307,7 +1301,7 @@
   for (const auto& stats : data_stats) {
     StatsReport::Id id(StatsReport::NewTypedIntId(
         StatsReport::kStatsReportTypeDataChannel, stats.id));
-    StatsReport* report = reports->ReplaceOrAddNew(id);
+    StatsReport* report = reports_.ReplaceOrAddNew(id);
     report->set_timestamp(stats_gathering_started_);
     report->AddString(StatsReport::kStatsValueNameLabel, stats.label);
     // Filter out the initial id (-1).
diff --git a/pc/legacy_stats_collector.h b/pc/legacy_stats_collector.h
index e905b39..cedd36c 100644
--- a/pc/legacy_stats_collector.h
+++ b/pc/legacy_stats_collector.h
@@ -165,13 +165,11 @@
                                        const StatsReport::Id& channel_report_id,
                                        const cricket::ConnectionInfo& info);
 
-  void ExtractDataInfo_n(StatsCollection* reports);
+  void ExtractDataInfo();
 
   // Returns the `transport_names_by_mid` member from the SessionStats as
-  // gathered and used to populate the stats. Contains one synchronous hop
-  // to the network thread to get this information along with querying data
-  // channel stats at the same time and populating `reports_`.
-  std::map<std::string, std::string> ExtractSessionAndDataInfo();
+  // gathered and used to populate the stats.
+  std::map<std::string, std::string> ExtractSessionInfo();
 
   void ExtractBweInfo();
   void ExtractMediaInfo(
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index 82c5914..fdbd32b 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -2286,7 +2286,7 @@
 }
 
 std::vector<DataChannelStats> PeerConnection::GetDataChannelStats() const {
-  RTC_DCHECK_RUN_ON(network_thread());
+  RTC_DCHECK_RUN_ON(signaling_thread());
   return data_channel_controller_.GetDataChannelStats();
 }
 
diff --git a/pc/proxy.h b/pc/proxy.h
index ebe60c0..2be115f 100644
--- a/pc/proxy.h
+++ b/pc/proxy.h
@@ -450,24 +450,6 @@
     return c_->method();                     \
   }
 
-// Allows a custom implementation of a method where the otherwise proxied
-// implementation can do a more efficient, yet thread-safe, job than the proxy
-// can do by default or when more flexibility is needed than can be provided
-// by a proxy.
-// Note that calls to these methods should be expected to be made from unknown
-// threads.
-#define BYPASS_PROXY_METHOD0(r, method) \
-  r method() override {                 \
-    TRACE_BOILERPLATE(method);          \
-    return c_->method();                \
-  }
-
-// The 1 argument version of `BYPASS_PROXY_METHOD0`.
-#define BYPASS_PROXY_METHOD1(r, method, t1) \
-  r method(t1 a1) override {                \
-    TRACE_BOILERPLATE(method);              \
-    return c_->method(std::move(a1));       \
-  }
 }  // namespace webrtc
 
 #endif  //  PC_PROXY_H_
diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc
index 574e1cc..429e9d0 100644
--- a/pc/rtc_stats_collector.cc
+++ b/pc/rtc_stats_collector.cc
@@ -1499,6 +1499,7 @@
   RTC_DCHECK_RUN_ON(signaling_thread_);
   rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
 
+  ProduceDataChannelStats_s(timestamp, partial_report);
   ProduceMediaStreamStats_s(timestamp, partial_report);
   ProduceMediaStreamTrackStats_s(timestamp, partial_report);
   ProduceMediaSourceStats_s(timestamp, partial_report);
@@ -1518,8 +1519,6 @@
   // `network_report_event_` is reset before this method is invoked.
   network_report_ = RTCStatsReport::Create(timestamp);
 
-  ProduceDataChannelStats_n(timestamp, network_report_.get());
-
   std::set<std::string> transport_names;
   if (sctp_transport_name) {
     transport_names.emplace(std::move(*sctp_transport_name));
@@ -1654,10 +1653,10 @@
   }
 }
 
-void RTCStatsCollector::ProduceDataChannelStats_n(
+void RTCStatsCollector::ProduceDataChannelStats_s(
     Timestamp timestamp,
     RTCStatsReport* report) const {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
   std::vector<DataChannelStats> data_stats = pc_->GetDataChannelStats();
   for (const auto& stats : data_stats) {
diff --git a/pc/rtc_stats_collector.h b/pc/rtc_stats_collector.h
index 34962bf..ac0453f 100644
--- a/pc/rtc_stats_collector.h
+++ b/pc/rtc_stats_collector.h
@@ -186,7 +186,7 @@
       const std::map<std::string, CertificateStatsPair>& transport_cert_stats,
       RTCStatsReport* report) const;
   // Produces `RTCDataChannelStats`.
-  void ProduceDataChannelStats_n(Timestamp timestamp,
+  void ProduceDataChannelStats_s(Timestamp timestamp,
                                  RTCStatsReport* report) const;
   // Produces `RTCIceCandidatePairStats` and `RTCIceCandidateStats`.
   void ProduceIceCandidateAndPairStats_n(
diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc
index 892eca9..623a153 100644
--- a/pc/sctp_data_channel.cc
+++ b/pc/sctp_data_channel.cc
@@ -38,8 +38,8 @@
 // Define proxy for DataChannelInterface.
 BEGIN_PROXY_MAP(DataChannel)
 PROXY_PRIMARY_THREAD_DESTRUCTOR()
-BYPASS_PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
-BYPASS_PROXY_METHOD0(void, UnregisterObserver)
+PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
+PROXY_METHOD0(void, UnregisterObserver)
 BYPASS_PROXY_CONSTMETHOD0(std::string, label)
 BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
 BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
@@ -50,18 +50,20 @@
 BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
 BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
 // Can't bypass the proxy since the id may change.
-PROXY_SECONDARY_CONSTMETHOD0(int, id)
+PROXY_CONSTMETHOD0(int, id)
 BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
-BYPASS_PROXY_CONSTMETHOD0(DataState, state)
-PROXY_SECONDARY_CONSTMETHOD0(RTCError, error)
-PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_sent)
-PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_sent)
-PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_received)
-PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_received)
-PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount)
-PROXY_SECONDARY_METHOD0(void, Close)
-PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&)
+PROXY_CONSTMETHOD0(DataState, state)
+PROXY_CONSTMETHOD0(RTCError, error)
+PROXY_CONSTMETHOD0(uint32_t, messages_sent)
+PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
+PROXY_CONSTMETHOD0(uint32_t, messages_received)
+PROXY_CONSTMETHOD0(uint64_t, bytes_received)
+PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
+PROXY_METHOD0(void, Close)
+// TODO(bugs.webrtc.org/11547): Change to run on the network thread.
+PROXY_METHOD1(bool, Send, const DataBuffer&)
 END_PROXY_MAP(DataChannel)
+
 }  // namespace
 
 InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base)
@@ -140,78 +142,6 @@
   used_sids_.erase(sid);
 }
 
-// A DataChannelObserver implementation that offers backwards compatibility with
-// implementations that aren't yet ready to be called back on the network
-// thread. This implementation posts events to the signaling thread where
-// events are delivered.
-// In the class, and together with the `SctpDataChannel` implementation, there's
-// special handling for the `state()` property whereby if that property is
-// queried on the channel object while inside an event callback, we return
-// the state that was active at the time the event was issued. This is to avoid
-// a problem with calling the `state()` getter on the proxy, which would do
-// a blocking call to the network thread, effectively flushing operations on
-// the network thread that could cause the state to change and eventually return
-// a misleading or arguably, wrong, state value to the callback implementation.
-// As a future improvement to the ObserverAdapter, we could do the same for
-// other properties that need to be read on the network thread. Eventually
-// all implementations should expect to be called on the network thread though
-// and the ObserverAdapter no longer be necessary.
-class SctpDataChannel::ObserverAdapter : public DataChannelObserver {
- public:
-  explicit ObserverAdapter(DataChannelObserver* delegate,
-                           SctpDataChannel* channel)
-      : delegate_(delegate), channel_(channel) {}
-
-  bool IsInsideStateNotification() const {
-    RTC_DCHECK_RUN_ON(signaling_thread());
-    return inside_state_change_;
-  }
-
-  DataChannelInterface::DataState cached_state() const {
-    RTC_DCHECK_RUN_ON(signaling_thread());
-    RTC_DCHECK(IsInsideStateNotification());
-    return cached_state_;
-  }
-
- private:
-  void OnStateChange() override {
-    RTC_DCHECK_RUN_ON(network_thread());
-    signaling_thread()->PostTask(
-        SafeTask(safety_.flag(), [this, new_state = channel_->state()] {
-          RTC_DCHECK_RUN_ON(signaling_thread());
-          cached_state_ = new_state;
-          inside_state_change_ = true;
-          delegate_->OnStateChange();
-          inside_state_change_ = false;
-        }));
-  }
-
-  void OnMessage(const DataBuffer& buffer) override {
-    RTC_DCHECK_RUN_ON(network_thread());
-    signaling_thread()->PostTask(
-        SafeTask(safety_.flag(),
-                 [this, buffer = buffer] { delegate_->OnMessage(buffer); }));
-  }
-
-  void OnBufferedAmountChange(uint64_t sent_data_size) override {
-    RTC_DCHECK_RUN_ON(network_thread());
-    signaling_thread()->PostTask(
-        SafeTask(safety_.flag(), [this, sent_data_size] {
-          delegate_->OnBufferedAmountChange(sent_data_size);
-        }));
-  }
-
-  rtc::Thread* signaling_thread() const { return channel_->signaling_thread_; }
-  rtc::Thread* network_thread() const { return channel_->network_thread_; }
-
-  DataChannelObserver* const delegate_;
-  SctpDataChannel* const channel_;
-  ScopedTaskSafety safety_;
-  bool inside_state_change_ RTC_GUARDED_BY(signaling_thread()) = false;
-  DataChannelInterface::DataState cached_state_
-      RTC_GUARDED_BY(signaling_thread()) = DataChannelInterface::kConnecting;
-};
-
 // static
 rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
     rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
@@ -245,6 +175,7 @@
     rtc::Thread* network_thread)
     : signaling_thread_(signaling_thread),
       network_thread_(network_thread),
+      id_s_(config.id),
       id_n_(config.id),
       internal_id_(GenerateUniqueId()),
       label_(label),
@@ -276,81 +207,19 @@
   }
 }
 
-SctpDataChannel::~SctpDataChannel() {}
+SctpDataChannel::~SctpDataChannel() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+}
 
 void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) {
-  // Note: at this point, we do not know on which thread we're being called
-  // from since this method bypasses the proxy. On Android in particular,
-  // registration methods are called from unknown threads.
-
-  // Check if we should set up an observer adapter that will make sure that
-  // callbacks are delivered on the signaling thread rather than directly
-  // on the network thread.
-  const auto* current_thread = rtc::Thread::Current();
-  // TODO(webrtc:11547): Eventually all DataChannelObserver implementations
-  // should be called on the network thread and IsOkToCallOnTheNetworkThread().
-  if (!observer->IsOkToCallOnTheNetworkThread()) {
-    auto prepare_observer = [&]() {
-      RTC_DCHECK(!observer_adapter_);
-      observer_adapter_ = std::make_unique<ObserverAdapter>(observer, this);
-      return observer_adapter_.get();
-    };
-    // Instantiate the adapter in the right context and then substitute the
-    // observer pointer the SctpDataChannel will call back on, with the adapter.
-    if (signaling_thread_ == current_thread) {
-      observer = prepare_observer();
-    } else {
-      observer = signaling_thread_->BlockingCall(std::move(prepare_observer));
-    }
-  }
-
-  // Now do the observer registration on the network thread.
-  auto register_observer = [&] {
-    RTC_DCHECK_RUN_ON(network_thread_);
-    observer_ = observer;
-    DeliverQueuedReceivedData();
-  };
-
-  if (network_thread_ == current_thread) {
-    register_observer();
-  } else {
-    network_thread_->BlockingCall(std::move(register_observer));
-  }
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  observer_ = observer;
+  DeliverQueuedReceivedData();
 }
 
 void SctpDataChannel::UnregisterObserver() {
-  // Note: As with `RegisterObserver`, the proxy is being bypassed.
-  const auto* current_thread = rtc::Thread::Current();
-  // Callers must not be invoking the unregistration from the network thread
-  // (assuming a multi-threaded environment where we have a dedicated network
-  // thread). That would indicate non-network related work happening on the
-  // network thread or that unregistration is being done from within a callback
-  // (without unwinding the stack, which is a requirement).
-  // The network thread is not allowed to make blocking calls to the signaling
-  // thread, so that would blow up if attempted. Since we support an adapter
-  // for observers that are not safe to call on the network thread, we do
-  // need to check+free it on the signaling thread.
-  RTC_DCHECK(current_thread != network_thread_ ||
-             network_thread_ == signaling_thread_);
-
-  auto unregister_observer = [&] {
-    RTC_DCHECK_RUN_ON(network_thread_);
-    observer_ = nullptr;
-  };
-
-  if (current_thread == network_thread_) {
-    unregister_observer();
-  } else {
-    network_thread_->BlockingCall(std::move(unregister_observer));
-  }
-
-  auto clear_observer = [&]() { observer_adapter_.reset(); };
-
-  if (current_thread != signaling_thread_) {
-    signaling_thread_->BlockingCall(std::move(clear_observer));
-  } else {
-    clear_observer();
-  }
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  observer_ = nullptr;
 }
 
 std::string SctpDataChannel::label() const {
@@ -392,11 +261,8 @@
 }
 
 int SctpDataChannel::id() const {
-  RTC_DCHECK_RUN_ON(network_thread_);
-  // TODO(tommi): Once an ID has been assigned, it won't change (can be
-  // considered const). We could do special handling of this and allow bypassing
-  // the proxy so that we can return a valid id without thread hopping.
-  return id_n_.stream_id_int();
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  return id_s_.stream_id_int();
 }
 
 Priority SctpDataChannel::priority() const {
@@ -404,12 +270,12 @@
 }
 
 uint64_t SctpDataChannel::buffered_amount() const {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   return queued_send_data_.byte_count();
 }
 
 void SctpDataChannel::Close() {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (state_ == kClosing || state_ == kClosed)
     return;
   SetState(kClosing);
@@ -418,58 +284,40 @@
 }
 
 SctpDataChannel::DataState SctpDataChannel::state() const {
-  // Note: The proxy is bypassed for the `state()` accessor. This is to allow
-  // observer callbacks to query what the new state is from within a state
-  // update notification without having to do a blocking call to the network
-  // thread from within a callback. This also makes it so that the returned
-  // state is guaranteed to be the new state that provoked the state change
-  // notification, whereby a blocking call to the network thread might end up
-  // getting put behind other messages on the network thread and eventually
-  // fetch a different state value (since pending messages might cause the
-  // state to change in the meantime).
-  const auto* current_thread = rtc::Thread::Current();
-  if (current_thread == signaling_thread_) {
-    if (observer_adapter_ && observer_adapter_->IsInsideStateNotification())
-      return observer_adapter_->cached_state();
-  }
-
-  auto return_state = [&] {
-    RTC_DCHECK_RUN_ON(network_thread_);
-    return state_;
-  };
-
-  return current_thread == network_thread_
-             ? return_state()
-             : network_thread_->BlockingCall(std::move(return_state));
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  return state_;
 }
 
 RTCError SctpDataChannel::error() const {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   return error_;
 }
 
 uint32_t SctpDataChannel::messages_sent() const {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   return messages_sent_;
 }
 
 uint64_t SctpDataChannel::bytes_sent() const {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   return bytes_sent_;
 }
 
 uint32_t SctpDataChannel::messages_received() const {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   return messages_received_;
 }
 
 uint64_t SctpDataChannel::bytes_received() const {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   return bytes_received_;
 }
 
 bool SctpDataChannel::Send(const DataBuffer& buffer) {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
+  // thread. Bring buffer management etc to the network thread and keep the
+  // operational state management on the signaling thread.
 
   if (state_ != kOpen) {
     return false;
@@ -487,17 +335,25 @@
   return true;
 }
 
+void SctpDataChannel::SetSctpSid_s(StreamId sid) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK(!id_s_.HasValue());
+  RTC_DCHECK(sid.HasValue());
+  RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
+  RTC_DCHECK_EQ(state_, kConnecting);
+
+  id_s_ = sid;
+}
+
 void SctpDataChannel::SetSctpSid_n(StreamId sid) {
   RTC_DCHECK_RUN_ON(network_thread_);
   RTC_DCHECK(!id_n_.HasValue());
   RTC_DCHECK(sid.HasValue());
-  RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
-  RTC_DCHECK_EQ(state_, kConnecting);
   id_n_ = sid;
 }
 
 void SctpDataChannel::OnClosingProcedureStartedRemotely() {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (state_ != kClosing && state_ != kClosed) {
     // Don't bother sending queued data since the side that initiated the
     // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
@@ -513,7 +369,7 @@
 }
 
 void SctpDataChannel::OnClosingProcedureComplete() {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   // If the closing procedure is complete, we should have finished sending
   // all pending data and transitioned to kClosing already.
   RTC_DCHECK_EQ(state_, kClosing);
@@ -522,12 +378,12 @@
 }
 
 void SctpDataChannel::OnTransportChannelCreated() {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+
   connected_to_transport_ = true;
 }
 
 void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
-  RTC_DCHECK_RUN_ON(network_thread_);
   // The SctpTransport is unusable, which could come from multiple reasons:
   // - the SCTP m= section was rejected
   // - the DTLS transport is closed
@@ -536,7 +392,7 @@
 }
 
 DataChannelStats SctpDataChannel::GetStats() const {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   DataChannelStats stats{internal_id_,        id(),         label(),
                          protocol(),          state(),      messages_sent(),
                          messages_received(), bytes_sent(), bytes_received()};
@@ -545,25 +401,25 @@
 
 void SctpDataChannel::OnDataReceived(DataMessageType type,
                                      const rtc::CopyOnWriteBuffer& payload) {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
 
   if (type == DataMessageType::kControl) {
     if (handshake_state_ != kHandshakeWaitingForAck) {
       // Ignore it if we are not expecting an ACK message.
       RTC_LOG(LS_WARNING)
           << "DataChannel received unexpected CONTROL message, sid = "
-          << id_n_.stream_id_int();
+          << id_s_.stream_id_int();
       return;
     }
     if (ParseDataChannelOpenAckMessage(payload)) {
       // We can send unordered as soon as we receive the ACK message.
       handshake_state_ = kHandshakeReady;
       RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
-                       << id_n_.stream_id_int();
+                       << id_s_.stream_id_int();
     } else {
       RTC_LOG(LS_WARNING)
           << "DataChannel failed to parse OPEN_ACK message, sid = "
-          << id_n_.stream_id_int();
+          << id_s_.stream_id_int();
     }
     return;
   }
@@ -572,7 +428,7 @@
              type == DataMessageType::kText);
 
   RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
-                       << id_n_.stream_id_int();
+                       << id_s_.stream_id_int();
   // We can send unordered as soon as we receive any DATA message since the
   // remote side must have received the OPEN (and old clients do not send
   // OPEN_ACK).
@@ -603,7 +459,7 @@
 }
 
 void SctpDataChannel::OnTransportReady() {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
 
   // TODO(bugs.webrtc.org/11547): The transport is configured inside
   // `PeerConnection::SetupDataChannelTransport_n`, which results in
@@ -616,7 +472,6 @@
   // be on for the below `Send*` calls, which currently do a BlockingCall
   // from the signaling thread to the network thread.
   RTC_DCHECK(connected_to_transport_);
-  RTC_DCHECK(id_n_.HasValue());
 
   SendQueuedControlMessages();
   SendQueuedDataMessages();
@@ -625,7 +480,7 @@
 }
 
 void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
-  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK_RUN_ON(signaling_thread_);
 
   if (state_ == kClosed) {
     return;
@@ -646,14 +501,13 @@
 
 void SctpDataChannel::CloseAbruptlyWithDataChannelFailure(
     const std::string& message) {
-  RTC_DCHECK_RUN_ON(network_thread_);
   RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message);
   error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE);
   CloseAbruptlyWithError(std::move(error));
 }
 
-// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::UpdateState() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   // UpdateState determines what to do from a few state variables. Include
   // all conditions required for each state transition here for
   // clarity. OnTransportReady(true) will send any queued data and then invoke
@@ -681,7 +535,7 @@
           DeliverQueuedReceivedData();
         }
       } else {
-        RTC_DCHECK(!id_n_.HasValue());
+        RTC_DCHECK(!id_s_.HasValue());
       }
       break;
     }
@@ -697,9 +551,11 @@
           // to complete; after calling RemoveSctpDataStream,
           // OnClosingProcedureComplete will end up called asynchronously
           // afterwards.
-          if (!started_closing_procedure_ && id_n_.HasValue()) {
+          if (!started_closing_procedure_ && id_s_.HasValue()) {
             started_closing_procedure_ = true;
-            controller_->RemoveSctpDataStream(id_n_);
+            network_thread_->BlockingCall([c = controller_.get(), sid = id_s_] {
+              c->RemoveSctpDataStream(sid);
+            });
           }
         }
       } else {
@@ -716,8 +572,8 @@
   }
 }
 
-// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::SetState(DataState state) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (state_ == state) {
     return;
   }
@@ -731,8 +587,8 @@
     controller_->OnChannelStateChanged(this, state_);
 }
 
-// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::DeliverQueuedReceivedData() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (!observer_) {
     return;
   }
@@ -745,8 +601,8 @@
   }
 }
 
-// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::SendQueuedDataMessages() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (queued_send_data_.Empty()) {
     return;
   }
@@ -763,9 +619,9 @@
   }
 }
 
-// RTC_RUN_ON(network_thread_).
 bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
                                       bool queue_if_blocked) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   SendDataParams send_params;
   if (!controller_) {
     return false;
@@ -785,7 +641,7 @@
   send_params.type =
       buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
 
-  RTCError error = controller_->SendData(id_n_, send_params, buffer.data);
+  RTCError error = controller_->SendData(id_s_, send_params, buffer.data);
 
   if (error.ok()) {
     ++messages_sent_;
@@ -813,8 +669,8 @@
   return false;
 }
 
-// RTC_RUN_ON(network_thread_).
 bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   size_t start_buffered_amount = queued_send_data_.byte_count();
   if (start_buffered_amount + buffer.size() >
       DataChannelInterface::MaxSendQueueSize()) {
@@ -825,8 +681,8 @@
   return true;
 }
 
-// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::SendQueuedControlMessages() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   PacketQueue control_packets;
   control_packets.Swap(&queued_control_data_);
 
@@ -836,10 +692,10 @@
   }
 }
 
-// RTC_RUN_ON(network_thread_).
 bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   RTC_DCHECK(connected_to_transport_);
-  RTC_DCHECK(id_n_.HasValue());
+  RTC_DCHECK(id_s_.HasValue());
   RTC_DCHECK(controller_);
 
   bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
@@ -852,10 +708,10 @@
   send_params.ordered = ordered_ || is_open_message;
   send_params.type = DataMessageType::kControl;
 
-  RTCError err = controller_->SendData(id_n_, send_params, buffer);
+  RTCError err = controller_->SendData(id_s_, send_params, buffer);
   if (err.ok()) {
     RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel "
-                         << id_n_.stream_id_int();
+                         << id_s_.stream_id_int();
 
     if (handshake_state_ == kHandshakeShouldSendAck) {
       handshake_state_ = kHandshakeReady;
diff --git a/pc/sctp_data_channel.h b/pc/sctp_data_channel.h
index fcf1ffe..588b0cb 100644
--- a/pc/sctp_data_channel.h
+++ b/pc/sctp_data_channel.h
@@ -192,6 +192,7 @@
 
   // Sets the SCTP sid and adds to transport layer if not set yet. Should only
   // be called once.
+  void SetSctpSid_s(StreamId sid);
   void SetSctpSid_n(StreamId sid);
 
   // The remote side started the closing procedure by resetting its outgoing
@@ -215,6 +216,10 @@
   // stats purposes (see also `GetStats()`).
   int internal_id() const { return internal_id_; }
 
+  StreamId sid_s() const {
+    RTC_DCHECK_RUN_ON(signaling_thread_);
+    return id_s_;
+  }
   StreamId sid_n() const {
     RTC_DCHECK_RUN_ON(network_thread_);
     return id_n_;
@@ -234,8 +239,6 @@
   ~SctpDataChannel() override;
 
  private:
-  class ObserverAdapter;
-
   // The OPEN(_ACK) signaling state.
   enum HandshakeState {
     kHandshakeInit,
@@ -245,23 +248,21 @@
     kHandshakeReady
   };
 
-  void UpdateState() RTC_RUN_ON(network_thread_);
-  void SetState(DataState state) RTC_RUN_ON(network_thread_);
+  void UpdateState();
+  void SetState(DataState state);
 
-  void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_);
+  void DeliverQueuedReceivedData();
 
-  void SendQueuedDataMessages() RTC_RUN_ON(network_thread_);
-  bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked)
-      RTC_RUN_ON(network_thread_);
-  bool QueueSendDataMessage(const DataBuffer& buffer)
-      RTC_RUN_ON(network_thread_);
+  void SendQueuedDataMessages();
+  bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked);
+  bool QueueSendDataMessage(const DataBuffer& buffer);
 
-  void SendQueuedControlMessages() RTC_RUN_ON(network_thread_);
-  bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer)
-      RTC_RUN_ON(network_thread_);
+  void SendQueuedControlMessages();
+  bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer);
 
   rtc::Thread* const signaling_thread_;
   rtc::Thread* const network_thread_;
+  StreamId id_s_ RTC_GUARDED_BY(signaling_thread_);
   StreamId id_n_ RTC_GUARDED_BY(network_thread_);
   const int internal_id_;
   const std::string label_;
@@ -272,26 +273,25 @@
   const bool negotiated_;
   const bool ordered_;
 
-  DataChannelObserver* observer_ RTC_GUARDED_BY(network_thread_) = nullptr;
-  std::unique_ptr<ObserverAdapter> observer_adapter_;
-  DataState state_ RTC_GUARDED_BY(network_thread_) = kConnecting;
-  RTCError error_ RTC_GUARDED_BY(network_thread_);
-  uint32_t messages_sent_ RTC_GUARDED_BY(network_thread_) = 0;
-  uint64_t bytes_sent_ RTC_GUARDED_BY(network_thread_) = 0;
-  uint32_t messages_received_ RTC_GUARDED_BY(network_thread_) = 0;
-  uint64_t bytes_received_ RTC_GUARDED_BY(network_thread_) = 0;
+  DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_) = nullptr;
+  DataState state_ RTC_GUARDED_BY(signaling_thread_) = kConnecting;
+  RTCError error_ RTC_GUARDED_BY(signaling_thread_);
+  uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_) = 0;
+  uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_) = 0;
+  uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_) = 0;
+  uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_) = 0;
   rtc::WeakPtr<SctpDataChannelControllerInterface> controller_
-      RTC_GUARDED_BY(network_thread_);
-  HandshakeState handshake_state_ RTC_GUARDED_BY(network_thread_) =
+      RTC_GUARDED_BY(signaling_thread_);
+  HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_) =
       kHandshakeInit;
-  bool connected_to_transport_ RTC_GUARDED_BY(network_thread_) = false;
+  bool connected_to_transport_ RTC_GUARDED_BY(signaling_thread_) = false;
   // Did we already start the graceful SCTP closing procedure?
-  bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false;
+  bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false;
   // Control messages that always have to get sent out before any queued
   // data.
-  PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_);
-  PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_);
-  PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_);
+  PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_);
+  PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_);
+  PacketQueue queued_send_data_ RTC_GUARDED_BY(signaling_thread_);
 };
 
 // Downcast a PeerConnectionInterface that points to a proxy object
diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc
index 90a6cd2..c382d61 100644
--- a/pc/sdp_offer_answer.cc
+++ b/pc/sdp_offer_answer.cc
@@ -3268,16 +3268,20 @@
     return;
   }
 
-  absl::optional<rtc::SSLRole> guessed_role = GuessSslRole();
-  network_thread()->BlockingCall(
-      [&, data_channel_controller = data_channel_controller()] {
-        RTC_DCHECK_RUN_ON(network_thread());
-        absl::optional<rtc::SSLRole> role = pc_->GetSctpSslRole_n();
-        if (!role)
-          role = guessed_role;
-        if (role)
-          data_channel_controller->AllocateSctpSids(*role);
-      });
+  absl::optional<rtc::SSLRole> role = network_thread()->BlockingCall([this] {
+    RTC_DCHECK_RUN_ON(network_thread());
+    return pc_->GetSctpSslRole_n();
+  });
+
+  if (!role) {
+    role = GuessSslRole();
+  }
+
+  if (role) {
+    // TODO(webrtc:11547): Make this call on the network thread too once
+    // `AllocateSctpSids` has been updated.
+    data_channel_controller()->AllocateSctpSids(*role);
+  }
 }
 
 absl::optional<rtc::SSLRole> SdpOfferAnswerHandler::GuessSslRole() const {
@@ -5113,13 +5117,13 @@
   RTC_DCHECK_RUN_ON(signaling_thread());
   const bool has_sctp = pc_->sctp_mid().has_value();
 
-  context_->network_thread()->BlockingCall(
-      [&, data_channel_controller = data_channel_controller()] {
-        RTC_DCHECK_RUN_ON(context_->network_thread());
-        if (has_sctp)
-          data_channel_controller->OnTransportChannelClosed(error);
-        pc_->TeardownDataChannelTransport_n();
-      });
+  if (has_sctp)
+    data_channel_controller()->OnTransportChannelClosed(error);
+
+  context_->network_thread()->BlockingCall([this] {
+    RTC_DCHECK_RUN_ON(context_->network_thread());
+    pc_->TeardownDataChannelTransport_n();
+  });
 
   if (has_sctp)
     pc_->ResetSctpDataMid();
diff --git a/pc/test/fake_data_channel_controller.h b/pc/test/fake_data_channel_controller.h
index d1b41fc..26ecc31 100644
--- a/pc/test/fake_data_channel_controller.h
+++ b/pc/test/fake_data_channel_controller.h
@@ -29,31 +29,23 @@
         transport_available_(false),
         ready_to_send_(false),
         transport_error_(false) {}
-
-  ~FakeDataChannelController() override {
-    network_thread_->BlockingCall([&] {
-      RTC_DCHECK_RUN_ON(network_thread_);
-      weak_factory_.InvalidateWeakPtrs();
-    });
-  }
+  virtual ~FakeDataChannelController() {}
 
   rtc::WeakPtr<FakeDataChannelController> weak_ptr() {
-    RTC_DCHECK_RUN_ON(network_thread_);
     return weak_factory_.GetWeakPtr();
   }
 
   rtc::scoped_refptr<webrtc::SctpDataChannel> CreateDataChannel(
       absl::string_view label,
       webrtc::InternalDataChannelInit init) {
+    rtc::WeakPtr<FakeDataChannelController> my_weak_ptr = weak_ptr();
+    // Explicitly associate the weak ptr instance with the current thread to
+    // catch early any inappropriate referencing of it on the network thread.
+    RTC_CHECK(my_weak_ptr);
+
     rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
         network_thread_->BlockingCall([&]() {
           RTC_DCHECK_RUN_ON(network_thread_);
-          rtc::WeakPtr<FakeDataChannelController> my_weak_ptr = weak_ptr();
-          // Explicitly associate the weak ptr instance with the current thread
-          // to catch early any inappropriate referencing of it on the network
-          // thread.
-          RTC_CHECK(my_weak_ptr);
-
           rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
               webrtc::SctpDataChannel::Create(
                   std::move(my_weak_ptr), std::string(label),
@@ -62,16 +54,17 @@
           if (transport_available_ && channel->sid_n().HasValue()) {
             AddSctpDataStream(channel->sid_n());
           }
-          if (ready_to_send_) {
-            network_thread_->PostTask([channel = channel] {
-              if (channel->state() !=
-                  webrtc::DataChannelInterface::DataState::kClosed) {
-                channel->OnTransportReady();
-              }
-            });
-          }
           return channel;
         });
+    if (ready_to_send_) {
+      signaling_thread_->PostTask(
+          SafeTask(signaling_safety_.flag(), [channel = channel] {
+            if (channel->state() !=
+                webrtc::DataChannelInterface::DataState::kClosed) {
+              channel->OnTransportReady();
+            }
+          }));
+    }
     connected_channels_.insert(channel.get());
     return channel;
   }
@@ -79,7 +72,6 @@
   webrtc::RTCError SendData(webrtc::StreamId sid,
                             const webrtc::SendDataParams& params,
                             const rtc::CopyOnWriteBuffer& payload) override {
-    RTC_DCHECK_RUN_ON(network_thread_);
     RTC_CHECK(ready_to_send_);
     RTC_CHECK(transport_available_);
     if (send_blocked_) {
@@ -108,14 +100,17 @@
     RTC_DCHECK_RUN_ON(network_thread_);
     RTC_CHECK(sid.HasValue());
     known_stream_ids_.erase(sid);
-    // Unlike the real SCTP transport, act like the closing procedure finished
-    // instantly.
-    auto it = absl::c_find_if(connected_channels_,
-                              [&](const auto* c) { return c->sid_n() == sid; });
-    // This path mimics the DCC's OnChannelClosed handler since the FDCC
-    // (this class) doesn't have a transport that would do that.
-    if (it != connected_channels_.end())
-      (*it)->OnClosingProcedureComplete();
+    signaling_thread_->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
+      // Unlike the real SCTP transport, act like the closing procedure finished
+      // instantly.
+      auto it = absl::c_find_if(connected_channels_, [&](const auto* c) {
+        return c->sid_s() == sid;
+      });
+      // This path mimics the DCC's OnChannelClosed handler since the FDCC
+      // (this class) doesn't have a transport that would do that.
+      if (it != connected_channels_.end())
+        (*it)->OnClosingProcedureComplete();
+    }));
   }
 
   void OnChannelStateChanged(
@@ -131,40 +126,36 @@
 
   // Set true to emulate the SCTP stream being blocked by congestion control.
   void set_send_blocked(bool blocked) {
-    network_thread_->BlockingCall([&]() {
-      send_blocked_ = blocked;
-      if (!blocked) {
-        RTC_CHECK(transport_available_);
-        // Make a copy since `connected_channels_` may change while
-        // OnTransportReady is called.
-        auto copy = connected_channels_;
-        for (webrtc::SctpDataChannel* ch : copy) {
-          ch->OnTransportReady();
-        }
+    send_blocked_ = blocked;
+    if (!blocked) {
+      RTC_CHECK(transport_available_);
+      // Make a copy since `connected_channels_` may change while
+      // OnTransportReady is called.
+      auto copy = connected_channels_;
+      for (webrtc::SctpDataChannel* ch : copy) {
+        ch->OnTransportReady();
       }
-    });
+    }
   }
 
   // Set true to emulate the transport channel creation, e.g. after
   // setLocalDescription/setRemoteDescription called with data content.
   void set_transport_available(bool available) {
-    network_thread_->BlockingCall([&]() { transport_available_ = available; });
+    transport_available_ = available;
   }
 
   // Set true to emulate the transport OnTransportReady signal when the
   // transport becomes writable for the first time.
   void set_ready_to_send(bool ready) {
     RTC_CHECK(transport_available_);
-    network_thread_->BlockingCall([&]() {
-      ready_to_send_ = ready;
-      if (ready) {
-        std::set<webrtc::SctpDataChannel*>::iterator it;
-        for (it = connected_channels_.begin(); it != connected_channels_.end();
-             ++it) {
-          (*it)->OnTransportReady();
-        }
+    ready_to_send_ = ready;
+    if (ready) {
+      std::set<webrtc::SctpDataChannel*>::iterator it;
+      for (it = connected_channels_.begin(); it != connected_channels_.end();
+           ++it) {
+        (*it)->OnTransportReady();
       }
-    });
+    }
   }
 
   void set_transport_error() { transport_error_ = true; }