Reland "[DataChannel] Send and receive packets on the network thread."

This reverts commit 7f16fcda0fd5bb625584b71311dd37b54c096136.

Reason for reland: Re-landing after addressing issues in downstream
code and hardening the ObserverAdapter from situations where attempted
usage of data channel proxies could occur after shutting down the
peer connection and terminating the network thread.

Original change's description:
> Revert "[DataChannel] Send and receive packets on the network thread."
>
> This reverts commit fe53fec24e02d2d644220f913c3f9ae596bbb2d9.
>
> Reason for revert: Speculative revert, may be breaking downstream project
>
> Original change's description:
> > [DataChannel] Send and receive packets on the network thread.
> >
> > This updates sctp channels, including work that happens between the
> > data channel controller and the transport, to run on the network
> > thread. Previously all network traffic related to data channels was
> > routed through the signaling thread before going to either the network
> > thread or the caller's thread (e.g. js thread in chrome). Now the
> > calls can go straight from the network thread to the JS thread with
> > enabling a special flag on the observer (see below) and similarly
> > calls to send data, involve 2 threads instead of 3.
> >
> > * Custom data channel observer adapter implementation that
> >   maintains compatibility with existing observer implementations in
> >   that notifications are delivered on the signaling thread.
> >   The adapter can be explicitly disabled for implementations that
> >   want to optimize the callback path and promise to not block the
> >   network thread.
> > * Remove the signaling thread copy of data channels in the controller.
> > * Remove several PostTask operations that were needed to keep things
> >   in sync (but the need has gone away).
> > * Update tests for the controller to consistently call
> >   TeardownDataChannelTransport_n to match with production.
> > * Update stats collectors (current and legacy) to fetch the data
> >   channel stats on the network thread where they're maintained.
> > * Remove the AsyncChannelCloseTeardown test since the async teardown
> >   step has gone away.
> > * Remove `sid_s` in the channel code since we only need the network
> >   state now.
> > * For the custom observer support (with and without data adapter) and
> >   maintain compatibility with existing implementations, added a new
> >   proxy macro that allows an implementation to selectively provide
> >   its own implementation without being proxied. This is used for
> >   registering/unregistering a data channel observer.
> > * Update the data channel proxy to map most methods to the network
> >   thread, avoiding the interim jump to the signaling thread.
> > * Update a plethora of thread checkers from signaling to network.
> >
> > Bug: webrtc:11547
> > Change-Id: Ib4cff1482e31c46008e187189a79e967389bc518
> > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299142
> > Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> > Reviewed-by: Henrik Boström <hbos@webrtc.org>
> > Cr-Commit-Position: refs/heads/main@{#39760}
>
> Bug: webrtc:11547
> Change-Id: Id0d65594bf727ccea5c49093c942b09714d101ad
> No-Presubmit: true
> No-Tree-Checks: true
> No-Try: true
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300341
> Auto-Submit: Andrey Logvin <landrey@webrtc.org>
> Owners-Override: Andrey Logvin <landrey@webrtc.org>
> Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
> Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
> Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#39764}

Bug: webrtc:11547
Change-Id: I47dfa7e7168be0cd2faab4f8f3ebf110c3728af5
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300360
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39786}
diff --git a/pc/BUILD.gn b/pc/BUILD.gn
index 417f829..8c07879 100644
--- a/pc/BUILD.gn
+++ b/pc/BUILD.gn
@@ -892,10 +892,7 @@
     "../rtc_base/system:no_unique_address",
     "../rtc_base/system:unused",
   ]
-  absl_deps = [
-    "//third_party/abseil-cpp/absl/cleanup",
-    "//third_party/abseil-cpp/absl/types:optional",
-  ]
+  absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
 }
 
 rtc_library("data_channel_utils") {
@@ -1326,7 +1323,6 @@
     "../api/video:video_rtp_headers",
     "../call:call_interfaces",
     "../media:media_channel",
-    "../media:media_channel_impl",
     "../media:rtc_media_base",
     "../modules/audio_processing:audio_processing_statistics",
     "../p2p:rtc_p2p",
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index 36e8be1..b1ebfc0 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -29,8 +29,15 @@
 }
 
 bool DataChannelController::HasDataChannelsForTest() const {
-  RTC_DCHECK_RUN_ON(signaling_thread());
-  return !sctp_data_channels_.empty();
+  auto has_channels = [&] {
+    RTC_DCHECK_RUN_ON(network_thread());
+    return !sctp_data_channels_n_.empty();
+  };
+
+  if (network_thread()->IsCurrent())
+    return has_channels();
+
+  return network_thread()->BlockingCall(std::move(has_channels));
 }
 
 bool DataChannelController::HasUsedDataChannels() const {
@@ -66,11 +73,15 @@
 void DataChannelController::OnChannelStateChanged(
     SctpDataChannel* channel,
     DataChannelInterface::DataState state) {
-  RTC_DCHECK_RUN_ON(signaling_thread());
+  RTC_DCHECK_RUN_ON(network_thread());
   if (state == DataChannelInterface::DataState::kClosed)
     OnSctpDataChannelClosed(channel);
 
-  pc_->OnSctpDataChannelStateChanged(channel->internal_id(), state);
+  signaling_thread()->PostTask(
+      SafeTask(signaling_safety_.flag(),
+               [this, channel_id = channel->internal_id(), state = state] {
+                 pc_->OnSctpDataChannelStateChanged(channel_id, state);
+               }));
 }
 
 void DataChannelController::OnDataReceived(
@@ -82,27 +93,22 @@
   if (HandleOpenMessage_n(channel_id, type, buffer))
     return;
 
-  signaling_thread()->PostTask(
-      SafeTask(signaling_safety_.flag(), [this, channel_id, type, buffer] {
-        RTC_DCHECK_RUN_ON(signaling_thread());
-        // TODO(bugs.webrtc.org/11547): The data being received should be
-        // delivered on the network thread.
-        auto it = FindChannel(StreamId(channel_id));
-        if (it != sctp_data_channels_.end())
-          (*it)->OnDataReceived(type, buffer);
-      }));
+  auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
+    return c->sid_n().stream_id_int() == channel_id;
+  });
+
+  if (it != sctp_data_channels_n_.end())
+    (*it)->OnDataReceived(type, buffer);
 }
 
 void DataChannelController::OnChannelClosing(int channel_id) {
   RTC_DCHECK_RUN_ON(network_thread());
-  signaling_thread()->PostTask(
-      SafeTask(signaling_safety_.flag(), [this, channel_id] {
-        RTC_DCHECK_RUN_ON(signaling_thread());
-        // TODO(bugs.webrtc.org/11547): Should run on the network thread.
-        auto it = FindChannel(StreamId(channel_id));
-        if (it != sctp_data_channels_.end())
-          (*it)->OnClosingProcedureStartedRemotely();
-      }));
+  auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
+    return c->sid_n().stream_id_int() == channel_id;
+  });
+
+  if (it != sctp_data_channels_n_.end())
+    (*it)->OnClosingProcedureStartedRemotely();
 }
 
 void DataChannelController::OnChannelClosed(int channel_id) {
@@ -112,48 +118,44 @@
   auto it = absl::c_find_if(sctp_data_channels_n_,
                             [&](const auto& c) { return c->sid_n() == sid; });
 
-  if (it != sctp_data_channels_n_.end())
+  if (it != sctp_data_channels_n_.end()) {
+    rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
     sctp_data_channels_n_.erase(it);
-
-  signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
-    RTC_DCHECK_RUN_ON(signaling_thread());
-    auto it = FindChannel(sid);
-    // Remove the channel from our list, close it and free up resources.
-    if (it != sctp_data_channels_.end()) {
-      rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
-      // Note: this causes OnSctpDataChannelClosed() to not do anything
-      // when called from within `OnClosingProcedureComplete`.
-      sctp_data_channels_.erase(it);
-
-      channel->OnClosingProcedureComplete();
-    }
-  }));
+    channel->OnClosingProcedureComplete();
+  }
 }
 
 void DataChannelController::OnReadyToSend() {
   RTC_DCHECK_RUN_ON(network_thread());
-  signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
-    RTC_DCHECK_RUN_ON(signaling_thread());
-    auto copy = sctp_data_channels_;
-    for (const auto& channel : copy)
+  auto copy = sctp_data_channels_n_;
+  for (const auto& channel : copy) {
+    if (channel->sid_n().HasValue()) {
       channel->OnTransportReady();
-  }));
+    } else {
+      // This happens for role==SSL_SERVER channels when we get notified by
+      // the transport *before* the SDP code calls `AllocateSctpSids` to
+      // trigger assignment of sids. In this case OnTransportReady() will be
+      // called from within `AllocateSctpSids` below.
+      RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel.";
+    }
+  }
 }
 
 void DataChannelController::OnTransportClosed(RTCError error) {
   RTC_DCHECK_RUN_ON(network_thread());
-  signaling_thread()->PostTask(
-      SafeTask(signaling_safety_.flag(), [this, error] {
-        RTC_DCHECK_RUN_ON(signaling_thread());
-        OnTransportChannelClosed(error);
-      }));
+  // This loop will close all data channels and trigger a callback to
+  // `OnSctpDataChannelClosed` which will modify `sctp_data_channels_n_`, so
+  // we create a local copy while we do the fan-out.
+  auto copy = sctp_data_channels_n_;
+  for (const auto& channel : copy)
+    channel->OnTransportChannelClosed(error);
 }
 
 void DataChannelController::SetupDataChannelTransport_n() {
   RTC_DCHECK_RUN_ON(network_thread());
 
   // There's a new data channel transport.  This needs to be signaled to the
-  // `sctp_data_channels_` so that they can reopen and reconnect.  This is
+  // `sctp_data_channels_n_` so that they can reopen and reconnect.  This is
   // necessary when bundling is applied.
   NotifyDataChannelsOfTransportCreated();
 }
@@ -165,11 +167,12 @@
 
 void DataChannelController::TeardownDataChannelTransport_n() {
   RTC_DCHECK_RUN_ON(network_thread());
-  if (data_channel_transport()) {
-    data_channel_transport()->SetDataSink(nullptr);
+  if (data_channel_transport_) {
+    data_channel_transport_->SetDataSink(nullptr);
+    set_data_channel_transport(nullptr);
   }
-  set_data_channel_transport(nullptr);
   sctp_data_channels_n_.clear();
+  weak_factory_.InvalidateWeakPtrs();
 }
 
 void DataChannelController::OnTransportChanged(
@@ -185,7 +188,7 @@
       new_data_channel_transport->SetDataSink(this);
 
       // There's a new data channel transport.  This needs to be signaled to the
-      // `sctp_data_channels_` so that they can reopen and reconnect.  This is
+      // `sctp_data_channels_n_` so that they can reopen and reconnect.  This is
       // necessary when bundling is applied.
       NotifyDataChannelsOfTransportCreated();
     }
@@ -194,10 +197,10 @@
 
 std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
     const {
-  RTC_DCHECK_RUN_ON(signaling_thread());
+  RTC_DCHECK_RUN_ON(network_thread());
   std::vector<DataChannelStats> stats;
-  stats.reserve(sctp_data_channels_.size());
-  for (const auto& channel : sctp_data_channels_)
+  stats.reserve(sctp_data_channels_n_.size());
+  for (const auto& channel : sctp_data_channels_n_)
     stats.push_back(channel->GetStats());
   return stats;
 }
@@ -219,28 +222,38 @@
                         << channel_id;
   } else {
     config.open_handshake_role = InternalDataChannelInit::kAcker;
-    signaling_thread()->PostTask(
-        SafeTask(signaling_safety_.flag(),
-                 [this, label = std::move(label), config = std::move(config)] {
-                   RTC_DCHECK_RUN_ON(signaling_thread());
-                   OnDataChannelOpenMessage(label, config);
-                 }));
+    auto channel_or_error = CreateDataChannel(label, config);
+    if (channel_or_error.ok()) {
+      signaling_thread()->PostTask(SafeTask(
+          signaling_safety_.flag(),
+          [this, channel = channel_or_error.MoveValue(),
+           ready_to_send = data_channel_transport_->IsReadyToSend()] {
+            RTC_DCHECK_RUN_ON(signaling_thread());
+            OnDataChannelOpenMessage(std::move(channel), ready_to_send);
+          }));
+    } else {
+      RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
+                        << ToString(channel_or_error.error().type());
+    }
   }
   return true;
 }
 
 void DataChannelController::OnDataChannelOpenMessage(
-    const std::string& label,
-    const InternalDataChannelInit& config) {
-  auto channel_or_error = InternalCreateDataChannelWithProxy(label, config);
-  if (!channel_or_error.ok()) {
-    RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
-                      << ToString(channel_or_error.error().type());
-    return;
-  }
+    rtc::scoped_refptr<SctpDataChannel> channel,
+    bool ready_to_send) {
+  has_used_data_channels_ = true;
+  auto proxy = SctpDataChannel::CreateProxy(channel, signaling_safety_.flag());
 
-  pc_->Observer()->OnDataChannel(channel_or_error.MoveValue());
+  pc_->Observer()->OnDataChannel(proxy);
   pc_->NoteDataAddedEvent();
+
+  if (ready_to_send) {
+    network_thread()->PostTask([channel = std::move(channel)] {
+      if (channel->state() != DataChannelInterface::DataState::kClosed)
+        channel->OnTransportReady();
+    });
+  }
 }
 
 // RTC_RUN_ON(network_thread())
@@ -269,6 +282,31 @@
   return RTCError::OK();
 }
 
+// RTC_RUN_ON(network_thread())
+RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>>
+DataChannelController::CreateDataChannel(const std::string& label,
+                                         InternalDataChannelInit& config) {
+  StreamId sid(config.id);
+  RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role);
+  if (!err.ok())
+    return err;
+
+  // In case `sid` has changed. Update `config` accordingly.
+  config.id = sid.stream_id_int();
+
+  rtc::scoped_refptr<SctpDataChannel> channel = SctpDataChannel::Create(
+      weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr,
+      config, signaling_thread(), network_thread());
+  RTC_DCHECK(channel);
+  sctp_data_channels_n_.push_back(channel);
+
+  // If we have an id already, notify the transport.
+  if (sid.HasValue())
+    AddSctpDataStream(sid);
+
+  return channel;
+}
+
 RTCErrorOr<rtc::scoped_refptr<DataChannelInterface>>
 DataChannelController::InternalCreateDataChannelWithProxy(
     const std::string& label,
@@ -283,29 +321,25 @@
   bool ready_to_send = false;
   InternalDataChannelInit new_config = config;
   StreamId sid(new_config.id);
-  auto weak_ptr = weak_factory_.GetWeakPtr();
-  RTC_DCHECK(weak_ptr);  // Associate with current thread.
   auto ret = network_thread()->BlockingCall(
       [&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> {
         RTC_DCHECK_RUN_ON(network_thread());
-        RTCError err = ReserveOrAllocateSid(sid, new_config.fallback_ssl_role);
-        if (!err.ok())
-          return err;
-
-        // In case `sid` has changed. Update `new_config` accordingly.
-        new_config.id = sid.stream_id_int();
+        auto channel = CreateDataChannel(label, new_config);
+        if (!channel.ok())
+          return channel;
         ready_to_send =
             data_channel_transport_ && data_channel_transport_->IsReadyToSend();
-
-        rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
-            std::move(weak_ptr), label, data_channel_transport_ != nullptr,
-            new_config, signaling_thread(), network_thread()));
-        RTC_DCHECK(channel);
-        sctp_data_channels_n_.push_back(channel);
-
-        // If we have an id already, notify the transport.
-        if (sid.HasValue())
-          AddSctpDataStream(sid);
+        if (ready_to_send) {
+          // If the transport is ready to send because the initial channel
+          // ready signal may have been sent before the DataChannel creation.
+          // This has to be done async because the upper layer objects (e.g.
+          // Chrome glue and WebKit) are not wired up properly until after
+          // `InternalCreateDataChannelWithProxy` returns.
+          network_thread()->PostTask([channel = channel.value()] {
+            if (channel->state() != DataChannelInterface::DataState::kClosed)
+              channel->OnTransportReady();
+          });
+        }
 
         return channel;
       });
@@ -313,114 +347,72 @@
   if (!ret.ok())
     return ret.MoveError();
 
-  if (ready_to_send) {
-    // Checks if the transport is ready to send because the initial channel
-    // ready signal may have been sent before the DataChannel creation.
-    // This has to be done async because the upper layer objects (e.g.
-    // Chrome glue and WebKit) are not wired up properly until after this
-    // function returns.
-    signaling_thread()->PostTask(
-        SafeTask(signaling_safety_.flag(), [channel = ret.value()] {
-          if (channel->state() != DataChannelInterface::DataState::kClosed)
-            channel->OnTransportReady();
-        }));
-  }
-
-  sctp_data_channels_.push_back(ret.value());
   has_used_data_channels_ = true;
-  return SctpDataChannel::CreateProxy(ret.MoveValue());
+  return SctpDataChannel::CreateProxy(ret.MoveValue(),
+                                      signaling_safety_.flag());
 }
 
 void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
-  RTC_DCHECK_RUN_ON(signaling_thread());
+  RTC_DCHECK_RUN_ON(network_thread());
+
+  const bool ready_to_send =
+      data_channel_transport_ && data_channel_transport_->IsReadyToSend();
 
   std::vector<std::pair<SctpDataChannel*, StreamId>> channels_to_update;
   std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
-
-  network_thread()->BlockingCall([&] {
-    RTC_DCHECK_RUN_ON(network_thread());
-    for (auto it = sctp_data_channels_n_.begin();
-         it != sctp_data_channels_n_.end();) {
-      if (!(*it)->sid_n().HasValue()) {
-        StreamId sid = sid_allocator_.AllocateSid(role);
-        if (sid.HasValue()) {
-          (*it)->SetSctpSid_n(sid);
-          AddSctpDataStream(sid);
-          channels_to_update.push_back(std::make_pair((*it).get(), sid));
-        } else {
-          channels_to_close.push_back(std::move(*it));
-          it = sctp_data_channels_n_.erase(it);
-          continue;
+  for (auto it = sctp_data_channels_n_.begin();
+       it != sctp_data_channels_n_.end();) {
+    if (!(*it)->sid_n().HasValue()) {
+      StreamId sid = sid_allocator_.AllocateSid(role);
+      if (sid.HasValue()) {
+        (*it)->SetSctpSid_n(sid);
+        AddSctpDataStream(sid);
+        if (ready_to_send) {
+          RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send.";
+          (*it)->OnTransportReady();
         }
+        channels_to_update.push_back(std::make_pair((*it).get(), sid));
+      } else {
+        channels_to_close.push_back(std::move(*it));
+        it = sctp_data_channels_n_.erase(it);
+        continue;
       }
-      ++it;
     }
-  });
+    ++it;
+  }
 
   // Since closing modifies the list of channels, we have to do the actual
   // closing outside the loop.
   for (const auto& channel : channels_to_close) {
     channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
-    // The channel should now have been removed from sctp_data_channels_.
-    RTC_DCHECK(absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
-                 return c.get() == channel.get();
-               }) == sctp_data_channels_.end());
-  }
-
-  for (auto& pair : channels_to_update) {
-    auto it = absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
-      return c.get() == pair.first;
-    });
-    RTC_DCHECK(it != sctp_data_channels_.end());
-    (*it)->SetSctpSid_s(pair.second);
   }
 }
 
 void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
-  RTC_DCHECK_RUN_ON(signaling_thread());
-
-  network_thread()->BlockingCall([&] {
-    RTC_DCHECK_RUN_ON(network_thread());
-    // After the closing procedure is done, it's safe to use this ID for
-    // another data channel.
-    if (channel->sid_n().HasValue()) {
-      sid_allocator_.ReleaseSid(channel->sid_n());
-    }
-
-    auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
-      return c.get() == channel;
-    });
-
-    if (it != sctp_data_channels_n_.end())
-      sctp_data_channels_n_.erase(it);
-  });
-
-  for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
-       ++it) {
-    if (it->get() == channel) {
-      // Since this method is triggered by a signal from the DataChannel,
-      // we can't free it directly here; we need to free it asynchronously.
-      rtc::scoped_refptr<SctpDataChannel> release = std::move(*it);
-      sctp_data_channels_.erase(it);
-      signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(),
-                                            [release = std::move(release)] {}));
-      return;
-    }
+  RTC_DCHECK_RUN_ON(network_thread());
+  // After the closing procedure is done, it's safe to use this ID for
+  // another data channel.
+  if (channel->sid_n().HasValue()) {
+    sid_allocator_.ReleaseSid(channel->sid_n());
   }
+  auto it = absl::c_find_if(sctp_data_channels_n_,
+                            [&](const auto& c) { return c.get() == channel; });
+  if (it != sctp_data_channels_n_.end())
+    sctp_data_channels_n_.erase(it);
 }
 
 void DataChannelController::OnTransportChannelClosed(RTCError error) {
-  RTC_DCHECK_RUN_ON(signaling_thread());
+  RTC_DCHECK_RUN_ON(network_thread());
   // Use a temporary copy of the SCTP DataChannel list because the
   // DataChannel may callback to us and try to modify the list.
   // TODO(tommi): `OnTransportChannelClosed` is called from
   // `SdpOfferAnswerHandler::DestroyDataChannelTransport` just before
   // `TeardownDataChannelTransport_n` is called (but on the network thread) from
-  // the same function. Once `sctp_data_channels_` moves to the network thread,
-  // we can get rid of this function (OnTransportChannelClosed) and run this
-  // loop from within the TeardownDataChannelTransport_n callback.
+  // the same function. We can now get rid of this function
+  // (OnTransportChannelClosed) and run this loop from within the
+  // TeardownDataChannelTransport_n callback.
   std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
-  temp_sctp_dcs.swap(sctp_data_channels_);
+  temp_sctp_dcs.swap(sctp_data_channels_n_);
   for (const auto& channel : temp_sctp_dcs) {
     channel->OnTransportChannelClosed(error);
   }
@@ -444,16 +436,10 @@
     StreamId sid,
     const SendDataParams& params,
     const rtc::CopyOnWriteBuffer& payload) {
-  // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
-  // thread instead. Remove the BlockingCall() below and move associated state
-  // to the network thread.
-  RTC_DCHECK_RUN_ON(signaling_thread());
+  RTC_DCHECK_RUN_ON(network_thread());
   RTC_DCHECK(data_channel_transport());
-
-  return network_thread()->BlockingCall([this, sid, params, payload] {
-    return data_channel_transport()->SendData(sid.stream_id_int(), params,
-                                              payload);
-  });
+  return data_channel_transport()->SendData(sid.stream_id_int(), params,
+                                            payload);
 }
 
 void DataChannelController::NotifyDataChannelsOfTransportCreated() {
@@ -463,27 +449,14 @@
   for (const auto& channel : sctp_data_channels_n_) {
     if (channel->sid_n().HasValue())
       AddSctpDataStream(channel->sid_n());
+    channel->OnTransportChannelCreated();
   }
-
-  signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
-    RTC_DCHECK_RUN_ON(signaling_thread());
-    for (const auto& channel : sctp_data_channels_) {
-      channel->OnTransportChannelCreated();
-    }
-  }));
-}
-
-std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator
-DataChannelController::FindChannel(StreamId stream_id) {
-  RTC_DCHECK_RUN_ON(signaling_thread());
-  return absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
-    return c->sid_s() == stream_id;
-  });
 }
 
 rtc::Thread* DataChannelController::network_thread() const {
   return pc_->network_thread();
 }
+
 rtc::Thread* DataChannelController::signaling_thread() const {
   return pc_->signaling_thread();
 }
diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h
index fa6c13e..074b1fe 100644
--- a/pc/data_channel_controller.h
+++ b/pc/data_channel_controller.h
@@ -107,6 +107,11 @@
   rtc::Thread* signaling_thread() const;
 
  private:
+  // Creates a new SctpDataChannel object on the network thread.
+  RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> CreateDataChannel(
+      const std::string& label,
+      InternalDataChannelInit& config) RTC_RUN_ON(network_thread());
+
   // Parses and handles open messages.  Returns true if the message is an open
   // message and should be considered to be handled, false otherwise.
   bool HandleOpenMessage_n(int channel_id,
@@ -114,8 +119,8 @@
                            const rtc::CopyOnWriteBuffer& buffer)
       RTC_RUN_ON(network_thread());
   // Called when a valid data channel OPEN message is received.
-  void OnDataChannelOpenMessage(const std::string& label,
-                                const InternalDataChannelInit& config)
+  void OnDataChannelOpenMessage(rtc::scoped_refptr<SctpDataChannel> channel,
+                                bool ready_to_send)
       RTC_RUN_ON(signaling_thread());
 
   // Accepts a `StreamId` which may be pre-negotiated or unassigned. For
@@ -139,9 +144,6 @@
   // (calls OnTransportChannelCreated on the signaling thread).
   void NotifyDataChannelsOfTransportCreated();
 
-  std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator FindChannel(
-      StreamId stream_id);
-
   // Plugin transport used for data channels.  Pointer may be accessed and
   // checked from any thread, but the object may only be touched on the
   // network thread.
@@ -149,21 +151,16 @@
   // thread.
   DataChannelTransportInterface* data_channel_transport_ = nullptr;
   SctpSidAllocator sid_allocator_ RTC_GUARDED_BY(network_thread());
-  std::vector<rtc::scoped_refptr<SctpDataChannel>> sctp_data_channels_
-      RTC_GUARDED_BY(signaling_thread());
-  // TODO(bugs.webrtc.org/11547): This vector will eventually take over from
-  // `sctp_data_channels_`. While we're migrating away from thread hops
-  // between the signaling and network threads, we need both, so this is
-  // a temporary situation.
   std::vector<rtc::scoped_refptr<SctpDataChannel>> sctp_data_channels_n_
       RTC_GUARDED_BY(network_thread());
   bool has_used_data_channels_ RTC_GUARDED_BY(signaling_thread()) = false;
 
   // Owning PeerConnection.
   PeerConnectionInternal* const pc_;
-  // The weak pointers must be dereferenced and invalidated on the signalling
+  // The weak pointers must be dereferenced and invalidated on the network
   // thread only.
-  rtc::WeakPtrFactory<DataChannelController> weak_factory_{this};
+  rtc::WeakPtrFactory<DataChannelController> weak_factory_
+      RTC_GUARDED_BY(network_thread()){this};
   ScopedTaskSafety signaling_safety_;
 };
 
diff --git a/pc/data_channel_controller_unittest.cc b/pc/data_channel_controller_unittest.cc
index 9b66fac..fd3deae 100644
--- a/pc/data_channel_controller_unittest.cc
+++ b/pc/data_channel_controller_unittest.cc
@@ -131,44 +131,6 @@
   channel->Close();
 }
 
-TEST_F(DataChannelControllerTest, AsyncChannelCloseTeardown) {
-  DataChannelControllerForTest dcc(pc_.get());
-  auto ret = dcc.InternalCreateDataChannelWithProxy(
-      "label", InternalDataChannelInit(DataChannelInit()));
-  ASSERT_TRUE(ret.ok());
-  auto channel = ret.MoveValue();
-  SctpDataChannel* inner_channel =
-      DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
-          channel.get());
-  // Grab a reference for testing purposes.
-  inner_channel->AddRef();
-
-  channel = nullptr;  // dcc still holds a reference to `channel`.
-  EXPECT_TRUE(dcc.HasDataChannelsForTest());
-
-  // Trigger a Close() for the channel. This will send events back to dcc,
-  // eventually reaching `OnSctpDataChannelClosed` where dcc removes
-  // the channel from the internal list of data channels, but does not release
-  // the reference synchronously since that reference might be the last one.
-  inner_channel->Close();
-  // Now there should be no tracked data channels.
-  EXPECT_FALSE(dcc.HasDataChannelsForTest());
-  // But there should be an async operation queued that still holds a reference.
-  // That means that the test reference, must not be the last one.
-  ASSERT_NE(inner_channel->Release(),
-            rtc::RefCountReleaseStatus::kDroppedLastRef);
-  // Grab a reference again (using the pointer is safe since the object still
-  // exists and we control the single-threaded environment manually).
-  inner_channel->AddRef();
-  // Now run the queued up async operations on the signaling (current) thread.
-  // This time, the reference formerly owned by dcc, should be release and the
-  // truly last reference is now held by the test.
-  run_loop_.Flush();
-  // Check that this is the last reference.
-  EXPECT_EQ(inner_channel->Release(),
-            rtc::RefCountReleaseStatus::kDroppedLastRef);
-}
-
 // Allocate the maximum number of data channels and then one more.
 // The last allocation should fail.
 TEST_F(DataChannelControllerTest, MaxChannels) {
diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc
index e52590b..7c88d29 100644
--- a/pc/data_channel_unittest.cc
+++ b/pc/data_channel_unittest.cc
@@ -77,10 +77,12 @@
         controller_(new FakeDataChannelController(&network_thread_)) {
     network_thread_.Start();
     inner_channel_ = controller_->CreateDataChannel("test", init_);
-    channel_ = webrtc::SctpDataChannel::CreateProxy(inner_channel_);
+    channel_ =
+        webrtc::SctpDataChannel::CreateProxy(inner_channel_, signaling_safety_);
   }
   ~SctpDataChannelTest() override {
     run_loop_.Flush();
+    signaling_safety_->SetNotAlive();
     inner_channel_ = nullptr;
     channel_ = nullptr;
     controller_.reset();
@@ -90,11 +92,17 @@
 
   void SetChannelReady() {
     controller_->set_transport_available(true);
-    inner_channel_->OnTransportChannelCreated();
-    if (!inner_channel_->sid_s().HasValue()) {
-      SetChannelSid(inner_channel_, StreamId(0));
-    }
+    StreamId sid(0);
+    network_thread_.BlockingCall([&]() {
+      RTC_DCHECK_RUN_ON(&network_thread_);
+      if (!inner_channel_->sid_n().HasValue()) {
+        inner_channel_->SetSctpSid_n(sid);
+        controller_->AddSctpDataStream(sid);
+      }
+      inner_channel_->OnTransportChannelCreated();
+    });
     controller_->set_ready_to_send(true);
+    run_loop_.Flush();
   }
 
   // TODO(bugs.webrtc.org/11547): This mirrors what the DataChannelController
@@ -105,9 +113,10 @@
   void SetChannelSid(const rtc::scoped_refptr<SctpDataChannel>& channel,
                      StreamId sid) {
     RTC_DCHECK(sid.HasValue());
-    network_thread_.BlockingCall(
-        [&]() { controller_->AddSctpDataStream(sid); });
-    channel->SetSctpSid_s(sid);
+    network_thread_.BlockingCall([&]() {
+      channel->SetSctpSid_n(sid);
+      controller_->AddSctpDataStream(sid);
+    });
   }
 
   void AddObserver() {
@@ -118,6 +127,8 @@
   test::RunLoop run_loop_;
   rtc::Thread network_thread_;
   InternalDataChannelInit init_;
+  rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety_ =
+      PendingTaskSafetyFlag::Create();
   std::unique_ptr<FakeDataChannelController> controller_;
   std::unique_ptr<FakeDataChannelObserver> observer_;
   rtc::scoped_refptr<SctpDataChannel> inner_channel_;
@@ -143,11 +154,13 @@
 
   // Check the non-const part of the configuration.
   EXPECT_EQ(channel_->id(), init_.id);
-  EXPECT_EQ(inner_channel_->sid_s(), StreamId());
+  network_thread_.BlockingCall(
+      [&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId()); });
 
   SetChannelReady();
   EXPECT_EQ(channel_->id(), 0);
-  EXPECT_EQ(inner_channel_->sid_s(), StreamId(0));
+  network_thread_.BlockingCall(
+      [&]() { EXPECT_EQ(inner_channel_->sid_n(), StreamId(0)); });
 }
 
 // Verifies that the data channel is connected to the transport after creation.
@@ -155,13 +168,15 @@
   controller_->set_transport_available(true);
   rtc::scoped_refptr<SctpDataChannel> dc =
       controller_->CreateDataChannel("test1", init_);
-
   EXPECT_TRUE(controller_->IsConnected(dc.get()));
+
   // The sid is not set yet, so it should not have added the streams.
-  EXPECT_FALSE(controller_->IsStreamAdded(dc->sid_s()));
+  StreamId sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); });
+  EXPECT_FALSE(controller_->IsStreamAdded(sid));
 
   SetChannelSid(dc, StreamId(0));
-  EXPECT_TRUE(controller_->IsStreamAdded(dc->sid_s()));
+  sid = network_thread_.BlockingCall([&]() { return dc->sid_n(); });
+  EXPECT_TRUE(controller_->IsStreamAdded(sid));
 }
 
 // Tests the state of the data channel.
@@ -180,7 +195,7 @@
   channel_->Close();
   // The (simulated) transport close notifications runs on the network thread
   // and posts a completion notification to the signaling (current) thread.
-  // Allow that ooperation to complete before checking the state.
+  // Allow that operation to complete before checking the state.
   run_loop_.Flush();
   EXPECT_EQ(DataChannelInterface::kClosed, channel_->state());
   EXPECT_EQ(observer_->on_state_change_count(), 3u);
@@ -198,6 +213,7 @@
   EXPECT_TRUE(channel_->Send(buffer));
   size_t successful_send_count = 1;
 
+  run_loop_.Flush();
   EXPECT_EQ(0U, channel_->buffered_amount());
   EXPECT_EQ(successful_send_count,
             observer_->on_buffered_amount_change_count());
@@ -214,6 +230,7 @@
             observer_->on_buffered_amount_change_count());
 
   controller_->set_send_blocked(false);
+  run_loop_.Flush();
   successful_send_count += number_of_packets;
   EXPECT_EQ(0U, channel_->buffered_amount());
   EXPECT_EQ(successful_send_count,
@@ -334,10 +351,9 @@
   SetChannelReady();
   InternalDataChannelInit init;
   init.id = 1;
-  rtc::scoped_refptr<SctpDataChannel> dc =
-      controller_->CreateDataChannel("test1", init);
-  EXPECT_EQ(DataChannelInterface::kConnecting, dc->state());
-  EXPECT_TRUE_WAIT(DataChannelInterface::kOpen == dc->state(), 1000);
+  auto dc = webrtc::SctpDataChannel::CreateProxy(
+      controller_->CreateDataChannel("test1", init), signaling_safety_);
+  EXPECT_EQ(DataChannelInterface::kOpen, dc->state());
 }
 
 // Tests that an unordered DataChannel sends data as ordered until the OPEN_ACK
@@ -349,21 +365,23 @@
   init.ordered = false;
   rtc::scoped_refptr<SctpDataChannel> dc =
       controller_->CreateDataChannel("test1", init);
+  auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_);
 
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
+  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
 
   // Sends a message and verifies it's ordered.
   DataBuffer buffer("some data");
-  ASSERT_TRUE(dc->Send(buffer));
+  ASSERT_TRUE(proxy->Send(buffer));
   EXPECT_TRUE(controller_->last_send_data_params().ordered);
 
   // Emulates receiving an OPEN_ACK message.
   rtc::CopyOnWriteBuffer payload;
   WriteDataChannelOpenAckMessage(&payload);
-  dc->OnDataReceived(DataMessageType::kControl, payload);
+  network_thread_.BlockingCall(
+      [&] { dc->OnDataReceived(DataMessageType::kControl, payload); });
 
   // Sends another message and verifies it's unordered.
-  ASSERT_TRUE(dc->Send(buffer));
+  ASSERT_TRUE(proxy->Send(buffer));
   EXPECT_FALSE(controller_->last_send_data_params().ordered);
 }
 
@@ -376,15 +394,17 @@
   init.ordered = false;
   rtc::scoped_refptr<SctpDataChannel> dc =
       controller_->CreateDataChannel("test1", init);
+  auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_);
 
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
+  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
 
   // Emulates receiving a DATA message.
   DataBuffer buffer("data");
-  dc->OnDataReceived(DataMessageType::kText, buffer.data);
+  network_thread_.BlockingCall(
+      [&] { dc->OnDataReceived(DataMessageType::kText, buffer.data); });
 
   // Sends a message and verifies it's unordered.
-  ASSERT_TRUE(dc->Send(buffer));
+  ASSERT_TRUE(proxy->Send(buffer));
   EXPECT_FALSE(controller_->last_send_data_params().ordered);
 }
 
@@ -437,7 +457,10 @@
   AddObserver();
 
   DataBuffer buffer("abcd");
-  inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data);
+  network_thread_.BlockingCall([&] {
+    inner_channel_->OnDataReceived(DataMessageType::kText, buffer.data);
+  });
+  run_loop_.Flush();
   EXPECT_EQ(1U, observer_->messages_received());
 }
 
@@ -452,8 +475,9 @@
   SetChannelReady();
   rtc::scoped_refptr<SctpDataChannel> dc =
       controller_->CreateDataChannel("test1", config);
+  auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_);
 
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
+  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
   EXPECT_EQ(0, controller_->last_sid());
 }
 
@@ -477,9 +501,10 @@
   EXPECT_EQ(0U, channel_->bytes_received());
 
   // Receive three buffers while data channel isn't open.
-  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[0].data);
-  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[1].data);
-  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[2].data);
+  network_thread_.BlockingCall([&] {
+    for (int i : {0, 1, 2})
+      inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data);
+  });
   EXPECT_EQ(0U, observer_->messages_received());
   EXPECT_EQ(0U, channel_->messages_received());
   EXPECT_EQ(0U, channel_->bytes_received());
@@ -493,9 +518,11 @@
   EXPECT_EQ(bytes_received, channel_->bytes_received());
 
   // Receive three buffers while open.
-  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[3].data);
-  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[4].data);
-  inner_channel_->OnDataReceived(DataMessageType::kText, buffers[5].data);
+  network_thread_.BlockingCall([&] {
+    for (int i : {3, 4, 5})
+      inner_channel_->OnDataReceived(DataMessageType::kText, buffers[i].data);
+  });
+  run_loop_.Flush();
   bytes_received += buffers[3].size() + buffers[4].size() + buffers[5].size();
   EXPECT_EQ(6U, observer_->messages_received());
   EXPECT_EQ(6U, channel_->messages_received());
@@ -513,8 +540,9 @@
   SetChannelReady();
   rtc::scoped_refptr<SctpDataChannel> dc =
       controller_->CreateDataChannel("test1", config);
+  auto proxy = webrtc::SctpDataChannel::CreateProxy(dc, signaling_safety_);
 
-  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, dc->state(), 1000);
+  EXPECT_EQ_WAIT(DataChannelInterface::kOpen, proxy->state(), 1000);
 
   EXPECT_EQ(config.id, controller_->last_sid());
   EXPECT_EQ(DataMessageType::kControl,
@@ -551,9 +579,8 @@
     EXPECT_TRUE(channel_->Send(packet));
   }
 
-  // The sending buffer shoul be full, send returns false.
+  // The sending buffer should be full, `Send()` returns false.
   EXPECT_FALSE(channel_->Send(packet));
-
   EXPECT_TRUE(DataChannelInterface::kOpen == channel_->state());
 }
 
@@ -577,10 +604,12 @@
   rtc::CopyOnWriteBuffer buffer(1024);
   memset(buffer.MutableData(), 0, buffer.size());
 
-  // Receiving data without having an observer will overflow the buffer.
-  for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
-    inner_channel_->OnDataReceived(DataMessageType::kText, buffer);
-  }
+  network_thread_.BlockingCall([&] {
+    // Receiving data without having an observer will overflow the buffer.
+    for (size_t i = 0; i < 16 * 1024 + 1; ++i) {
+      inner_channel_->OnDataReceived(DataMessageType::kText, buffer);
+    }
+  });
   EXPECT_EQ(DataChannelInterface::kClosed, channel_->state());
   EXPECT_FALSE(channel_->error().ok());
   EXPECT_EQ(RTCErrorType::RESOURCE_EXHAUSTED, channel_->error().type());
@@ -601,7 +630,8 @@
 // Tests that a channel can be closed without being opened or assigned an sid.
 TEST_F(SctpDataChannelTest, NeverOpened) {
   controller_->set_transport_available(true);
-  inner_channel_->OnTransportChannelCreated();
+  network_thread_.BlockingCall(
+      [&] { inner_channel_->OnTransportChannelCreated(); });
   channel_->Close();
 }
 
@@ -631,7 +661,8 @@
   // transition to the "closed" state.
   RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, "");
   error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
-  inner_channel_->OnTransportChannelClosed(error);
+  network_thread_.BlockingCall(
+      [&] { inner_channel_->OnTransportChannelClosed(error); });
   controller_.reset(nullptr);
   EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(),
                  kDefaultTimeout);
@@ -651,7 +682,8 @@
   error.set_error_detail(RTCErrorDetailType::SCTP_FAILURE);
   error.set_sctp_cause_code(
       static_cast<uint16_t>(cricket::SctpErrorCauseCode::kProtocolViolation));
-  inner_channel_->OnTransportChannelClosed(error);
+  network_thread_.BlockingCall(
+      [&] { inner_channel_->OnTransportChannelClosed(error); });
   controller_.reset(nullptr);
   EXPECT_EQ_WAIT(DataChannelInterface::kClosed, channel_->state(),
                  kDefaultTimeout);
diff --git a/pc/legacy_stats_collector.cc b/pc/legacy_stats_collector.cc
index 6829e35..bc5bb5f 100644
--- a/pc/legacy_stats_collector.cc
+++ b/pc/legacy_stats_collector.cc
@@ -34,7 +34,6 @@
 #include "api/video/video_timing.h"
 #include "call/call.h"
 #include "media/base/media_channel.h"
-#include "media/base/media_channel_impl.h"
 #include "modules/audio_processing/include/audio_processing_statistics.h"
 #include "p2p/base/ice_transport_internal.h"
 #include "p2p/base/p2p_constants.h"
@@ -670,7 +669,7 @@
   // to fetch stats, then applies them on the signaling thread. See if we need
   // to do this synchronously or if updating the stats without blocking is safe.
   std::map<std::string, std::string> transport_names_by_mid =
-      ExtractSessionInfo();
+      ExtractSessionAndDataInfo();
 
   // TODO(tommi): All of these hop over to the worker thread to fetch
   // information.  We could post a task to run all of these and post
@@ -681,7 +680,6 @@
   ExtractBweInfo();
   ExtractMediaInfo(transport_names_by_mid);
   ExtractSenderInfo();
-  ExtractDataInfo();
   UpdateTrackReports();
 }
 
@@ -856,19 +854,26 @@
   return report;
 }
 
-std::map<std::string, std::string> LegacyStatsCollector::ExtractSessionInfo() {
-  TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionInfo");
+std::map<std::string, std::string>
+LegacyStatsCollector::ExtractSessionAndDataInfo() {
+  TRACE_EVENT0("webrtc", "LegacyStatsCollector::ExtractSessionAndDataInfo");
   RTC_DCHECK_RUN_ON(pc_->signaling_thread());
 
   SessionStats stats;
+  StatsCollection::Container data_report_collection;
   auto transceivers = pc_->GetTransceiversInternal();
   pc_->network_thread()->BlockingCall(
       [&, sctp_transport_name = pc_->sctp_transport_name(),
        sctp_mid = pc_->sctp_mid()]() mutable {
         stats = ExtractSessionInfo_n(
             transceivers, std::move(sctp_transport_name), std::move(sctp_mid));
+        StatsCollection data_reports;
+        ExtractDataInfo_n(&data_reports);
+        data_report_collection = data_reports.DetachCollection();
       });
 
+  reports_.MergeCollection(std::move(data_report_collection));
+
   ExtractSessionInfo_s(stats);
 
   return std::move(stats.transport_names_by_mid);
@@ -1292,8 +1297,8 @@
   }
 }
 
-void LegacyStatsCollector::ExtractDataInfo() {
-  RTC_DCHECK_RUN_ON(pc_->signaling_thread());
+void LegacyStatsCollector::ExtractDataInfo_n(StatsCollection* reports) {
+  RTC_DCHECK_RUN_ON(pc_->network_thread());
 
   rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
 
@@ -1301,7 +1306,7 @@
   for (const auto& stats : data_stats) {
     StatsReport::Id id(StatsReport::NewTypedIntId(
         StatsReport::kStatsReportTypeDataChannel, stats.id));
-    StatsReport* report = reports_.ReplaceOrAddNew(id);
+    StatsReport* report = reports->ReplaceOrAddNew(id);
     report->set_timestamp(stats_gathering_started_);
     report->AddString(StatsReport::kStatsValueNameLabel, stats.label);
     // Filter out the initial id (-1).
diff --git a/pc/legacy_stats_collector.h b/pc/legacy_stats_collector.h
index cedd36c..e905b39 100644
--- a/pc/legacy_stats_collector.h
+++ b/pc/legacy_stats_collector.h
@@ -165,11 +165,13 @@
                                        const StatsReport::Id& channel_report_id,
                                        const cricket::ConnectionInfo& info);
 
-  void ExtractDataInfo();
+  void ExtractDataInfo_n(StatsCollection* reports);
 
   // Returns the `transport_names_by_mid` member from the SessionStats as
-  // gathered and used to populate the stats.
-  std::map<std::string, std::string> ExtractSessionInfo();
+  // gathered and used to populate the stats. Contains one synchronous hop
+  // to the network thread to get this information along with querying data
+  // channel stats at the same time and populating `reports_`.
+  std::map<std::string, std::string> ExtractSessionAndDataInfo();
 
   void ExtractBweInfo();
   void ExtractMediaInfo(
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index fdbd32b..82c5914 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -2286,7 +2286,7 @@
 }
 
 std::vector<DataChannelStats> PeerConnection::GetDataChannelStats() const {
-  RTC_DCHECK_RUN_ON(signaling_thread());
+  RTC_DCHECK_RUN_ON(network_thread());
   return data_channel_controller_.GetDataChannelStats();
 }
 
diff --git a/pc/proxy.h b/pc/proxy.h
index a8b75e3..b0782bb 100644
--- a/pc/proxy.h
+++ b/pc/proxy.h
@@ -449,7 +449,6 @@
     TRACE_BOILERPLATE(method);               \
     return c_->method();                     \
   }
-
 // Allows a custom implementation of a method where the otherwise proxied
 // implementation can do a more efficient, yet thread-safe, job than the proxy
 // can do by default or when more flexibility is needed than can be provided
diff --git a/pc/rtc_stats_collector.cc b/pc/rtc_stats_collector.cc
index 429e9d0..574e1cc 100644
--- a/pc/rtc_stats_collector.cc
+++ b/pc/rtc_stats_collector.cc
@@ -1499,7 +1499,6 @@
   RTC_DCHECK_RUN_ON(signaling_thread_);
   rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
 
-  ProduceDataChannelStats_s(timestamp, partial_report);
   ProduceMediaStreamStats_s(timestamp, partial_report);
   ProduceMediaStreamTrackStats_s(timestamp, partial_report);
   ProduceMediaSourceStats_s(timestamp, partial_report);
@@ -1519,6 +1518,8 @@
   // `network_report_event_` is reset before this method is invoked.
   network_report_ = RTCStatsReport::Create(timestamp);
 
+  ProduceDataChannelStats_n(timestamp, network_report_.get());
+
   std::set<std::string> transport_names;
   if (sctp_transport_name) {
     transport_names.emplace(std::move(*sctp_transport_name));
@@ -1653,10 +1654,10 @@
   }
 }
 
-void RTCStatsCollector::ProduceDataChannelStats_s(
+void RTCStatsCollector::ProduceDataChannelStats_n(
     Timestamp timestamp,
     RTCStatsReport* report) const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   rtc::Thread::ScopedDisallowBlockingCalls no_blocking_calls;
   std::vector<DataChannelStats> data_stats = pc_->GetDataChannelStats();
   for (const auto& stats : data_stats) {
diff --git a/pc/rtc_stats_collector.h b/pc/rtc_stats_collector.h
index ac0453f..34962bf 100644
--- a/pc/rtc_stats_collector.h
+++ b/pc/rtc_stats_collector.h
@@ -186,7 +186,7 @@
       const std::map<std::string, CertificateStatsPair>& transport_cert_stats,
       RTCStatsReport* report) const;
   // Produces `RTCDataChannelStats`.
-  void ProduceDataChannelStats_s(Timestamp timestamp,
+  void ProduceDataChannelStats_n(Timestamp timestamp,
                                  RTCStatsReport* report) const;
   // Produces `RTCIceCandidatePairStats` and `RTCIceCandidateStats`.
   void ProduceIceCandidateAndPairStats_n(
diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc
index 623a153..18d464b 100644
--- a/pc/sctp_data_channel.cc
+++ b/pc/sctp_data_channel.cc
@@ -15,7 +15,6 @@
 #include <string>
 #include <utility>
 
-#include "absl/cleanup/cleanup.h"
 #include "media/sctp/sctp_transport_internal.h"
 #include "pc/proxy.h"
 #include "rtc_base/checks.h"
@@ -38,8 +37,8 @@
 // Define proxy for DataChannelInterface.
 BEGIN_PROXY_MAP(DataChannel)
 PROXY_PRIMARY_THREAD_DESTRUCTOR()
-PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
-PROXY_METHOD0(void, UnregisterObserver)
+BYPASS_PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
+BYPASS_PROXY_METHOD0(void, UnregisterObserver)
 BYPASS_PROXY_CONSTMETHOD0(std::string, label)
 BYPASS_PROXY_CONSTMETHOD0(bool, reliable)
 BYPASS_PROXY_CONSTMETHOD0(bool, ordered)
@@ -50,20 +49,18 @@
 BYPASS_PROXY_CONSTMETHOD0(std::string, protocol)
 BYPASS_PROXY_CONSTMETHOD0(bool, negotiated)
 // Can't bypass the proxy since the id may change.
-PROXY_CONSTMETHOD0(int, id)
+PROXY_SECONDARY_CONSTMETHOD0(int, id)
 BYPASS_PROXY_CONSTMETHOD0(Priority, priority)
-PROXY_CONSTMETHOD0(DataState, state)
-PROXY_CONSTMETHOD0(RTCError, error)
-PROXY_CONSTMETHOD0(uint32_t, messages_sent)
-PROXY_CONSTMETHOD0(uint64_t, bytes_sent)
-PROXY_CONSTMETHOD0(uint32_t, messages_received)
-PROXY_CONSTMETHOD0(uint64_t, bytes_received)
-PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
-PROXY_METHOD0(void, Close)
-// TODO(bugs.webrtc.org/11547): Change to run on the network thread.
-PROXY_METHOD1(bool, Send, const DataBuffer&)
+BYPASS_PROXY_CONSTMETHOD0(DataState, state)
+BYPASS_PROXY_CONSTMETHOD0(RTCError, error)
+PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_sent)
+PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_sent)
+PROXY_SECONDARY_CONSTMETHOD0(uint32_t, messages_received)
+PROXY_SECONDARY_CONSTMETHOD0(uint64_t, bytes_received)
+PROXY_SECONDARY_CONSTMETHOD0(uint64_t, buffered_amount)
+PROXY_SECONDARY_METHOD0(void, Close)
+PROXY_SECONDARY_METHOD1(bool, Send, const DataBuffer&)
 END_PROXY_MAP(DataChannel)
-
 }  // namespace
 
 InternalDataChannelInit::InternalDataChannelInit(const DataChannelInit& base)
@@ -142,6 +139,138 @@
   used_sids_.erase(sid);
 }
 
+// A DataChannelObserver implementation that offers backwards compatibility with
+// implementations that aren't yet ready to be called back on the network
+// thread. This implementation posts events to the signaling thread where
+// events are delivered.
+// In the class, and together with the `SctpDataChannel` implementation, there's
+// special handling for the `state()` property whereby if that property is
+// queried on the channel object while inside an event callback, we return
+// the state that was active at the time the event was issued. This is to avoid
+// a problem with calling the `state()` getter on the proxy, which would do
+// a blocking call to the network thread, effectively flushing operations on
+// the network thread that could cause the state to change and eventually return
+// a misleading or arguably, wrong, state value to the callback implementation.
+// As a future improvement to the ObserverAdapter, we could do the same for
+// other properties that need to be read on the network thread. Eventually
+// all implementations should expect to be called on the network thread though
+// and the ObserverAdapter no longer be necessary.
+class SctpDataChannel::ObserverAdapter : public DataChannelObserver {
+ public:
+  explicit ObserverAdapter(
+      SctpDataChannel* channel,
+      rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety)
+      : channel_(channel), signaling_safety_(std::move(signaling_safety)) {}
+
+  bool IsInsideCallback() const {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    return cached_getters_ != nullptr;
+  }
+
+  DataChannelInterface::DataState cached_state() const {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    RTC_DCHECK(IsInsideCallback());
+    return cached_getters_->state();
+  }
+
+  RTCError cached_error() const {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    RTC_DCHECK(IsInsideCallback());
+    return cached_getters_->error();
+  }
+
+  void SetDelegate(DataChannelObserver* delegate) {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    delegate_ = delegate;
+    safety_.reset(PendingTaskSafetyFlag::CreateDetached());
+  }
+
+  static void DeleteOnSignalingThread(
+      std::unique_ptr<ObserverAdapter> observer) {
+    auto* signaling_thread = observer->signaling_thread();
+    if (!signaling_thread->IsCurrent())
+      signaling_thread->PostTask([observer = std::move(observer)]() {});
+  }
+
+ private:
+  class CachedGetters {
+   public:
+    explicit CachedGetters(ObserverAdapter* adapter)
+        : adapter_(adapter),
+          cached_state_(adapter_->channel_->state()),
+          cached_error_(adapter_->channel_->error()) {
+      RTC_DCHECK_RUN_ON(adapter->network_thread());
+    }
+
+    ~CachedGetters() {
+      if (!was_dropped_) {
+        RTC_DCHECK_RUN_ON(adapter_->signaling_thread());
+        RTC_DCHECK_EQ(adapter_->cached_getters_, this);
+        adapter_->cached_getters_ = nullptr;
+      }
+    }
+
+    bool PrepareForCallback() {
+      RTC_DCHECK_RUN_ON(adapter_->signaling_thread());
+      RTC_DCHECK(was_dropped_);
+      was_dropped_ = false;
+      adapter_->cached_getters_ = this;
+      return adapter_->delegate_ && adapter_->signaling_safety_->alive();
+    }
+
+    RTCError error() { return cached_error_; }
+    DataChannelInterface::DataState state() { return cached_state_; }
+
+   private:
+    ObserverAdapter* const adapter_;
+    bool was_dropped_ = true;
+    const DataChannelInterface::DataState cached_state_;
+    const RTCError cached_error_;
+  };
+
+  void OnStateChange() override {
+    RTC_DCHECK_RUN_ON(network_thread());
+    signaling_thread()->PostTask(
+        SafeTask(safety_.flag(),
+                 [this, cached_state = std::make_unique<CachedGetters>(this)] {
+                   RTC_DCHECK_RUN_ON(signaling_thread());
+                   if (cached_state->PrepareForCallback())
+                     delegate_->OnStateChange();
+                 }));
+  }
+
+  void OnMessage(const DataBuffer& buffer) override {
+    RTC_DCHECK_RUN_ON(network_thread());
+    signaling_thread()->PostTask(SafeTask(
+        safety_.flag(), [this, buffer = buffer,
+                         cached_state = std::make_unique<CachedGetters>(this)] {
+          RTC_DCHECK_RUN_ON(signaling_thread());
+          if (cached_state->PrepareForCallback())
+            delegate_->OnMessage(buffer);
+        }));
+  }
+
+  void OnBufferedAmountChange(uint64_t sent_data_size) override {
+    RTC_DCHECK_RUN_ON(network_thread());
+    signaling_thread()->PostTask(SafeTask(
+        safety_.flag(), [this, sent_data_size,
+                         cached_state = std::make_unique<CachedGetters>(this)] {
+          RTC_DCHECK_RUN_ON(signaling_thread());
+          if (cached_state->PrepareForCallback())
+            delegate_->OnBufferedAmountChange(sent_data_size);
+        }));
+  }
+
+  rtc::Thread* signaling_thread() const { return channel_->signaling_thread_; }
+  rtc::Thread* network_thread() const { return channel_->network_thread_; }
+
+  DataChannelObserver* delegate_ RTC_GUARDED_BY(signaling_thread()) = nullptr;
+  SctpDataChannel* const channel_;
+  ScopedTaskSafety safety_;
+  rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety_;
+  CachedGetters* cached_getters_ RTC_GUARDED_BY(signaling_thread()) = nullptr;
+};
+
 // static
 rtc::scoped_refptr<SctpDataChannel> SctpDataChannel::Create(
     rtc::WeakPtr<SctpDataChannelControllerInterface> controller,
@@ -158,10 +287,13 @@
 
 // static
 rtc::scoped_refptr<DataChannelInterface> SctpDataChannel::CreateProxy(
-    rtc::scoped_refptr<SctpDataChannel> channel) {
+    rtc::scoped_refptr<SctpDataChannel> channel,
+    rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety) {
   // Copy thread params to local variables before `std::move()`.
   auto* signaling_thread = channel->signaling_thread_;
   auto* network_thread = channel->network_thread_;
+  channel->observer_adapter_ = std::make_unique<ObserverAdapter>(
+      channel.get(), std::move(signaling_safety));
   return DataChannelProxy::Create(signaling_thread, network_thread,
                                   std::move(channel));
 }
@@ -175,7 +307,6 @@
     rtc::Thread* network_thread)
     : signaling_thread_(signaling_thread),
       network_thread_(network_thread),
-      id_s_(config.id),
       id_n_(config.id),
       internal_id_(GenerateUniqueId()),
       label_(label),
@@ -208,18 +339,87 @@
 }
 
 SctpDataChannel::~SctpDataChannel() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  if (observer_adapter_)
+    ObserverAdapter::DeleteOnSignalingThread(std::move(observer_adapter_));
 }
 
 void SctpDataChannel::RegisterObserver(DataChannelObserver* observer) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  observer_ = observer;
-  DeliverQueuedReceivedData();
+  // Note: at this point, we do not know on which thread we're being called
+  // from since this method bypasses the proxy. On Android in particular,
+  // registration methods are called from unknown threads.
+
+  // Check if we should set up an observer adapter that will make sure that
+  // callbacks are delivered on the signaling thread rather than directly
+  // on the network thread.
+  const auto* current_thread = rtc::Thread::Current();
+  // TODO(webrtc:11547): Eventually all DataChannelObserver implementations
+  // should be called on the network thread and IsOkToCallOnTheNetworkThread().
+  if (!observer->IsOkToCallOnTheNetworkThread()) {
+    RTC_LOG(LS_WARNING) << "DataChannelObserver - adapter needed";
+    auto prepare_observer = [&]() {
+      RTC_DCHECK(observer_adapter_) << "CreateProxy hasn't been called";
+      observer_adapter_->SetDelegate(observer);
+      return observer_adapter_.get();
+    };
+    // Instantiate the adapter in the right context and then substitute the
+    // observer pointer the SctpDataChannel will call back on, with the adapter.
+    if (signaling_thread_ == current_thread) {
+      observer = prepare_observer();
+    } else {
+      observer = signaling_thread_->BlockingCall(std::move(prepare_observer));
+    }
+  }
+
+  // Now do the observer registration on the network thread.
+  auto register_observer = [&] {
+    RTC_DCHECK_RUN_ON(network_thread_);
+    observer_ = observer;
+    DeliverQueuedReceivedData();
+  };
+
+  if (network_thread_ == current_thread) {
+    register_observer();
+  } else {
+    network_thread_->BlockingCall(std::move(register_observer));
+  }
 }
 
 void SctpDataChannel::UnregisterObserver() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  observer_ = nullptr;
+  // Note: As with `RegisterObserver`, the proxy is being bypassed.
+  const auto* current_thread = rtc::Thread::Current();
+  // Callers must not be invoking the unregistration from the network thread
+  // (assuming a multi-threaded environment where we have a dedicated network
+  // thread). That would indicate non-network related work happening on the
+  // network thread or that unregistration is being done from within a callback
+  // (without unwinding the stack, which is a requirement).
+  // The network thread is not allowed to make blocking calls to the signaling
+  // thread, so that would blow up if attempted. Since we support an adapter
+  // for observers that are not safe to call on the network thread, we do
+  // need to check+free it on the signaling thread.
+  RTC_DCHECK(current_thread != network_thread_ ||
+             network_thread_ == signaling_thread_);
+
+  auto unregister_observer = [&] {
+    RTC_DCHECK_RUN_ON(network_thread_);
+    observer_ = nullptr;
+  };
+
+  if (current_thread == network_thread_) {
+    unregister_observer();
+  } else {
+    network_thread_->BlockingCall(std::move(unregister_observer));
+  }
+
+  auto clear_observer = [&]() {
+    if (observer_adapter_)
+      observer_adapter_->SetDelegate(nullptr);
+  };
+
+  if (current_thread != signaling_thread_) {
+    signaling_thread_->BlockingCall(std::move(clear_observer));
+  } else {
+    clear_observer();
+  }
 }
 
 std::string SctpDataChannel::label() const {
@@ -261,8 +461,11 @@
 }
 
 int SctpDataChannel::id() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  return id_s_.stream_id_int();
+  RTC_DCHECK_RUN_ON(network_thread_);
+  // TODO(tommi): Once an ID has been assigned, it won't change (can be
+  // considered const). We could do special handling of this and allow bypassing
+  // the proxy so that we can return a valid id without thread hopping.
+  return id_n_.stream_id_int();
 }
 
 Priority SctpDataChannel::priority() const {
@@ -270,12 +473,12 @@
 }
 
 uint64_t SctpDataChannel::buffered_amount() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   return queued_send_data_.byte_count();
 }
 
 void SctpDataChannel::Close() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   if (state_ == kClosing || state_ == kClosed)
     return;
   SetState(kClosing);
@@ -284,40 +487,70 @@
 }
 
 SctpDataChannel::DataState SctpDataChannel::state() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  return state_;
+  // Note: The proxy is bypassed for the `state()` accessor. This is to allow
+  // observer callbacks to query what the new state is from within a state
+  // update notification without having to do a blocking call to the network
+  // thread from within a callback. This also makes it so that the returned
+  // state is guaranteed to be the new state that provoked the state change
+  // notification, whereby a blocking call to the network thread might end up
+  // getting put behind other messages on the network thread and eventually
+  // fetch a different state value (since pending messages might cause the
+  // state to change in the meantime).
+  const auto* current_thread = rtc::Thread::Current();
+  if (current_thread == signaling_thread_ && observer_adapter_ &&
+      observer_adapter_->IsInsideCallback()) {
+    return observer_adapter_->cached_state();
+  }
+
+  auto return_state = [&] {
+    RTC_DCHECK_RUN_ON(network_thread_);
+    return state_;
+  };
+
+  return current_thread == network_thread_
+             ? return_state()
+             : network_thread_->BlockingCall(std::move(return_state));
 }
 
 RTCError SctpDataChannel::error() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  return error_;
+  const auto* current_thread = rtc::Thread::Current();
+  if (current_thread == signaling_thread_ && observer_adapter_ &&
+      observer_adapter_->IsInsideCallback()) {
+    return observer_adapter_->cached_error();
+  }
+
+  auto return_error = [&] {
+    RTC_DCHECK_RUN_ON(network_thread_);
+    return error_;
+  };
+
+  return current_thread == network_thread_
+             ? return_error()
+             : network_thread_->BlockingCall(std::move(return_error));
 }
 
 uint32_t SctpDataChannel::messages_sent() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   return messages_sent_;
 }
 
 uint64_t SctpDataChannel::bytes_sent() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   return bytes_sent_;
 }
 
 uint32_t SctpDataChannel::messages_received() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   return messages_received_;
 }
 
 uint64_t SctpDataChannel::bytes_received() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   return bytes_received_;
 }
 
 bool SctpDataChannel::Send(const DataBuffer& buffer) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
-  // thread. Bring buffer management etc to the network thread and keep the
-  // operational state management on the signaling thread.
+  RTC_DCHECK_RUN_ON(network_thread_);
 
   if (state_ != kOpen) {
     return false;
@@ -335,25 +568,17 @@
   return true;
 }
 
-void SctpDataChannel::SetSctpSid_s(StreamId sid) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  RTC_DCHECK(!id_s_.HasValue());
-  RTC_DCHECK(sid.HasValue());
-  RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
-  RTC_DCHECK_EQ(state_, kConnecting);
-
-  id_s_ = sid;
-}
-
 void SctpDataChannel::SetSctpSid_n(StreamId sid) {
   RTC_DCHECK_RUN_ON(network_thread_);
   RTC_DCHECK(!id_n_.HasValue());
   RTC_DCHECK(sid.HasValue());
+  RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
+  RTC_DCHECK_EQ(state_, kConnecting);
   id_n_ = sid;
 }
 
 void SctpDataChannel::OnClosingProcedureStartedRemotely() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   if (state_ != kClosing && state_ != kClosed) {
     // Don't bother sending queued data since the side that initiated the
     // closure wouldn't receive it anyway. See crbug.com/559394 for a lengthy
@@ -369,7 +594,7 @@
 }
 
 void SctpDataChannel::OnClosingProcedureComplete() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   // If the closing procedure is complete, we should have finished sending
   // all pending data and transitioned to kClosing already.
   RTC_DCHECK_EQ(state_, kClosing);
@@ -378,12 +603,12 @@
 }
 
 void SctpDataChannel::OnTransportChannelCreated() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-
+  RTC_DCHECK_RUN_ON(network_thread_);
   connected_to_transport_ = true;
 }
 
 void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
+  RTC_DCHECK_RUN_ON(network_thread_);
   // The SctpTransport is unusable, which could come from multiple reasons:
   // - the SCTP m= section was rejected
   // - the DTLS transport is closed
@@ -392,7 +617,7 @@
 }
 
 DataChannelStats SctpDataChannel::GetStats() const {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
   DataChannelStats stats{internal_id_,        id(),         label(),
                          protocol(),          state(),      messages_sent(),
                          messages_received(), bytes_sent(), bytes_received()};
@@ -401,25 +626,25 @@
 
 void SctpDataChannel::OnDataReceived(DataMessageType type,
                                      const rtc::CopyOnWriteBuffer& payload) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
 
   if (type == DataMessageType::kControl) {
     if (handshake_state_ != kHandshakeWaitingForAck) {
       // Ignore it if we are not expecting an ACK message.
       RTC_LOG(LS_WARNING)
           << "DataChannel received unexpected CONTROL message, sid = "
-          << id_s_.stream_id_int();
+          << id_n_.stream_id_int();
       return;
     }
     if (ParseDataChannelOpenAckMessage(payload)) {
       // We can send unordered as soon as we receive the ACK message.
       handshake_state_ = kHandshakeReady;
       RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
-                       << id_s_.stream_id_int();
+                       << id_n_.stream_id_int();
     } else {
       RTC_LOG(LS_WARNING)
           << "DataChannel failed to parse OPEN_ACK message, sid = "
-          << id_s_.stream_id_int();
+          << id_n_.stream_id_int();
     }
     return;
   }
@@ -428,7 +653,7 @@
              type == DataMessageType::kText);
 
   RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
-                       << id_s_.stream_id_int();
+                       << id_n_.stream_id_int();
   // We can send unordered as soon as we receive any DATA message since the
   // remote side must have received the OPEN (and old clients do not send
   // OPEN_ACK).
@@ -459,7 +684,7 @@
 }
 
 void SctpDataChannel::OnTransportReady() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
 
   // TODO(bugs.webrtc.org/11547): The transport is configured inside
   // `PeerConnection::SetupDataChannelTransport_n`, which results in
@@ -472,6 +697,7 @@
   // be on for the below `Send*` calls, which currently do a BlockingCall
   // from the signaling thread to the network thread.
   RTC_DCHECK(connected_to_transport_);
+  RTC_DCHECK(id_n_.HasValue());
 
   SendQueuedControlMessages();
   SendQueuedDataMessages();
@@ -480,7 +706,7 @@
 }
 
 void SctpDataChannel::CloseAbruptlyWithError(RTCError error) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_);
 
   if (state_ == kClosed) {
     return;
@@ -501,13 +727,14 @@
 
 void SctpDataChannel::CloseAbruptlyWithDataChannelFailure(
     const std::string& message) {
+  RTC_DCHECK_RUN_ON(network_thread_);
   RTCError error(RTCErrorType::OPERATION_ERROR_WITH_DATA, message);
   error.set_error_detail(RTCErrorDetailType::DATA_CHANNEL_FAILURE);
   CloseAbruptlyWithError(std::move(error));
 }
 
+// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::UpdateState() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   // UpdateState determines what to do from a few state variables. Include
   // all conditions required for each state transition here for
   // clarity. OnTransportReady(true) will send any queued data and then invoke
@@ -535,7 +762,7 @@
           DeliverQueuedReceivedData();
         }
       } else {
-        RTC_DCHECK(!id_s_.HasValue());
+        RTC_DCHECK(!id_n_.HasValue());
       }
       break;
     }
@@ -551,11 +778,9 @@
           // to complete; after calling RemoveSctpDataStream,
           // OnClosingProcedureComplete will end up called asynchronously
           // afterwards.
-          if (!started_closing_procedure_ && id_s_.HasValue()) {
+          if (!started_closing_procedure_ && id_n_.HasValue()) {
             started_closing_procedure_ = true;
-            network_thread_->BlockingCall([c = controller_.get(), sid = id_s_] {
-              c->RemoveSctpDataStream(sid);
-            });
+            controller_->RemoveSctpDataStream(id_n_);
           }
         }
       } else {
@@ -572,8 +797,8 @@
   }
 }
 
+// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::SetState(DataState state) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (state_ == state) {
     return;
   }
@@ -587,8 +812,8 @@
     controller_->OnChannelStateChanged(this, state_);
 }
 
+// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::DeliverQueuedReceivedData() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (!observer_) {
     return;
   }
@@ -601,8 +826,8 @@
   }
 }
 
+// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::SendQueuedDataMessages() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (queued_send_data_.Empty()) {
     return;
   }
@@ -619,9 +844,9 @@
   }
 }
 
+// RTC_RUN_ON(network_thread_).
 bool SctpDataChannel::SendDataMessage(const DataBuffer& buffer,
                                       bool queue_if_blocked) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   SendDataParams send_params;
   if (!controller_) {
     return false;
@@ -641,7 +866,7 @@
   send_params.type =
       buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
 
-  RTCError error = controller_->SendData(id_s_, send_params, buffer.data);
+  RTCError error = controller_->SendData(id_n_, send_params, buffer.data);
 
   if (error.ok()) {
     ++messages_sent_;
@@ -669,8 +894,8 @@
   return false;
 }
 
+// RTC_RUN_ON(network_thread_).
 bool SctpDataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   size_t start_buffered_amount = queued_send_data_.byte_count();
   if (start_buffered_amount + buffer.size() >
       DataChannelInterface::MaxSendQueueSize()) {
@@ -681,8 +906,8 @@
   return true;
 }
 
+// RTC_RUN_ON(network_thread_).
 void SctpDataChannel::SendQueuedControlMessages() {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   PacketQueue control_packets;
   control_packets.Swap(&queued_control_data_);
 
@@ -692,10 +917,10 @@
   }
 }
 
+// RTC_RUN_ON(network_thread_).
 bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
   RTC_DCHECK(connected_to_transport_);
-  RTC_DCHECK(id_s_.HasValue());
+  RTC_DCHECK(id_n_.HasValue());
   RTC_DCHECK(controller_);
 
   bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
@@ -708,10 +933,10 @@
   send_params.ordered = ordered_ || is_open_message;
   send_params.type = DataMessageType::kControl;
 
-  RTCError err = controller_->SendData(id_s_, send_params, buffer);
+  RTCError err = controller_->SendData(id_n_, send_params, buffer);
   if (err.ok()) {
     RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel "
-                         << id_s_.stream_id_int();
+                         << id_n_.stream_id_int();
 
     if (handshake_state_ == kHandshakeShouldSendAck) {
       handshake_state_ = kHandshakeReady;
@@ -735,10 +960,4 @@
   g_unique_id = new_value;
 }
 
-SctpDataChannel* DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
-    DataChannelInterface* channel) {
-  return static_cast<SctpDataChannel*>(
-      static_cast<DataChannelProxy*>(channel)->internal());
-}
-
 }  // namespace webrtc
diff --git a/pc/sctp_data_channel.h b/pc/sctp_data_channel.h
index 588b0cb..50d5883 100644
--- a/pc/sctp_data_channel.h
+++ b/pc/sctp_data_channel.h
@@ -39,9 +39,7 @@
 class SctpDataChannel;
 
 // Interface that acts as a bridge from the data channel to the transport.
-// TODO(bugs.webrtc.org/11547): The transport operates on the network thread
-// and ultimately all the methods in this interface need to be invoked on the
-// network thread. Currently, some are called on the signaling thread.
+// All methods in this interface need to be invoked on the network thread.
 class SctpDataChannelControllerInterface {
  public:
   // Sends the data to the transport.
@@ -49,11 +47,9 @@
                             const SendDataParams& params,
                             const rtc::CopyOnWriteBuffer& payload) = 0;
   // Adds the data channel SID to the transport for SCTP.
-  // Note: Must be called on the network thread.
   virtual void AddSctpDataStream(StreamId sid) = 0;
   // Begins the closing procedure by sending an outgoing stream reset. Still
   // need to wait for callbacks to tell when this completes.
-  // Note: Must be called on the network thread.
   virtual void RemoveSctpDataStream(StreamId sid) = 0;
   // Notifies the controller of state changes.
   virtual void OnChannelStateChanged(SctpDataChannel* data_channel,
@@ -139,8 +135,14 @@
 
   // Instantiates an API proxy for a SctpDataChannel instance that will be
   // handed out to external callers.
+  // The `signaling_safety` flag is used for the ObserverAdapter callback proxy
+  // which delivers callbacks on the signaling thread but must not deliver such
+  // callbacks after the peerconnection has been closed. The data controller
+  // will update the flag when closed, which will cancel any pending event
+  // notifications.
   static rtc::scoped_refptr<DataChannelInterface> CreateProxy(
-      rtc::scoped_refptr<SctpDataChannel> channel);
+      rtc::scoped_refptr<SctpDataChannel> channel,
+      rtc::scoped_refptr<PendingTaskSafetyFlag> signaling_safety);
 
   void RegisterObserver(DataChannelObserver* observer) override;
   void UnregisterObserver() override;
@@ -192,7 +194,6 @@
 
   // Sets the SCTP sid and adds to transport layer if not set yet. Should only
   // be called once.
-  void SetSctpSid_s(StreamId sid);
   void SetSctpSid_n(StreamId sid);
 
   // The remote side started the closing procedure by resetting its outgoing
@@ -216,10 +217,6 @@
   // stats purposes (see also `GetStats()`).
   int internal_id() const { return internal_id_; }
 
-  StreamId sid_s() const {
-    RTC_DCHECK_RUN_ON(signaling_thread_);
-    return id_s_;
-  }
   StreamId sid_n() const {
     RTC_DCHECK_RUN_ON(network_thread_);
     return id_n_;
@@ -239,6 +236,8 @@
   ~SctpDataChannel() override;
 
  private:
+  class ObserverAdapter;
+
   // The OPEN(_ACK) signaling state.
   enum HandshakeState {
     kHandshakeInit,
@@ -248,21 +247,23 @@
     kHandshakeReady
   };
 
-  void UpdateState();
-  void SetState(DataState state);
+  void UpdateState() RTC_RUN_ON(network_thread_);
+  void SetState(DataState state) RTC_RUN_ON(network_thread_);
 
-  void DeliverQueuedReceivedData();
+  void DeliverQueuedReceivedData() RTC_RUN_ON(network_thread_);
 
-  void SendQueuedDataMessages();
-  bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked);
-  bool QueueSendDataMessage(const DataBuffer& buffer);
+  void SendQueuedDataMessages() RTC_RUN_ON(network_thread_);
+  bool SendDataMessage(const DataBuffer& buffer, bool queue_if_blocked)
+      RTC_RUN_ON(network_thread_);
+  bool QueueSendDataMessage(const DataBuffer& buffer)
+      RTC_RUN_ON(network_thread_);
 
-  void SendQueuedControlMessages();
-  bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer);
+  void SendQueuedControlMessages() RTC_RUN_ON(network_thread_);
+  bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer)
+      RTC_RUN_ON(network_thread_);
 
   rtc::Thread* const signaling_thread_;
   rtc::Thread* const network_thread_;
-  StreamId id_s_ RTC_GUARDED_BY(signaling_thread_);
   StreamId id_n_ RTC_GUARDED_BY(network_thread_);
   const int internal_id_;
   const std::string label_;
@@ -273,32 +274,28 @@
   const bool negotiated_;
   const bool ordered_;
 
-  DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_) = nullptr;
-  DataState state_ RTC_GUARDED_BY(signaling_thread_) = kConnecting;
-  RTCError error_ RTC_GUARDED_BY(signaling_thread_);
-  uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_) = 0;
-  uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_) = 0;
-  uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_) = 0;
-  uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_) = 0;
+  DataChannelObserver* observer_ RTC_GUARDED_BY(network_thread_) = nullptr;
+  std::unique_ptr<ObserverAdapter> observer_adapter_;
+  DataState state_ RTC_GUARDED_BY(network_thread_) = kConnecting;
+  RTCError error_ RTC_GUARDED_BY(network_thread_);
+  uint32_t messages_sent_ RTC_GUARDED_BY(network_thread_) = 0;
+  uint64_t bytes_sent_ RTC_GUARDED_BY(network_thread_) = 0;
+  uint32_t messages_received_ RTC_GUARDED_BY(network_thread_) = 0;
+  uint64_t bytes_received_ RTC_GUARDED_BY(network_thread_) = 0;
   rtc::WeakPtr<SctpDataChannelControllerInterface> controller_
-      RTC_GUARDED_BY(signaling_thread_);
-  HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_) =
+      RTC_GUARDED_BY(network_thread_);
+  HandshakeState handshake_state_ RTC_GUARDED_BY(network_thread_) =
       kHandshakeInit;
-  bool connected_to_transport_ RTC_GUARDED_BY(signaling_thread_) = false;
+  bool connected_to_transport_ RTC_GUARDED_BY(network_thread_) = false;
   // Did we already start the graceful SCTP closing procedure?
-  bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false;
+  bool started_closing_procedure_ RTC_GUARDED_BY(network_thread_) = false;
   // Control messages that always have to get sent out before any queued
   // data.
-  PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_);
-  PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_);
-  PacketQueue queued_send_data_ RTC_GUARDED_BY(signaling_thread_);
+  PacketQueue queued_control_data_ RTC_GUARDED_BY(network_thread_);
+  PacketQueue queued_received_data_ RTC_GUARDED_BY(network_thread_);
+  PacketQueue queued_send_data_ RTC_GUARDED_BY(network_thread_);
 };
 
-// Downcast a PeerConnectionInterface that points to a proxy object
-// to its underlying SctpDataChannel object. For testing only.
-SctpDataChannel* DowncastProxiedDataChannelInterfaceToSctpDataChannelForTesting(
-    DataChannelInterface* channel);
-
 }  // namespace webrtc
 
 #endif  // PC_SCTP_DATA_CHANNEL_H_
diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc
index c382d61..90a6cd2 100644
--- a/pc/sdp_offer_answer.cc
+++ b/pc/sdp_offer_answer.cc
@@ -3268,20 +3268,16 @@
     return;
   }
 
-  absl::optional<rtc::SSLRole> role = network_thread()->BlockingCall([this] {
-    RTC_DCHECK_RUN_ON(network_thread());
-    return pc_->GetSctpSslRole_n();
-  });
-
-  if (!role) {
-    role = GuessSslRole();
-  }
-
-  if (role) {
-    // TODO(webrtc:11547): Make this call on the network thread too once
-    // `AllocateSctpSids` has been updated.
-    data_channel_controller()->AllocateSctpSids(*role);
-  }
+  absl::optional<rtc::SSLRole> guessed_role = GuessSslRole();
+  network_thread()->BlockingCall(
+      [&, data_channel_controller = data_channel_controller()] {
+        RTC_DCHECK_RUN_ON(network_thread());
+        absl::optional<rtc::SSLRole> role = pc_->GetSctpSslRole_n();
+        if (!role)
+          role = guessed_role;
+        if (role)
+          data_channel_controller->AllocateSctpSids(*role);
+      });
 }
 
 absl::optional<rtc::SSLRole> SdpOfferAnswerHandler::GuessSslRole() const {
@@ -5117,13 +5113,13 @@
   RTC_DCHECK_RUN_ON(signaling_thread());
   const bool has_sctp = pc_->sctp_mid().has_value();
 
-  if (has_sctp)
-    data_channel_controller()->OnTransportChannelClosed(error);
-
-  context_->network_thread()->BlockingCall([this] {
-    RTC_DCHECK_RUN_ON(context_->network_thread());
-    pc_->TeardownDataChannelTransport_n();
-  });
+  context_->network_thread()->BlockingCall(
+      [&, data_channel_controller = data_channel_controller()] {
+        RTC_DCHECK_RUN_ON(context_->network_thread());
+        if (has_sctp)
+          data_channel_controller->OnTransportChannelClosed(error);
+        pc_->TeardownDataChannelTransport_n();
+      });
 
   if (has_sctp)
     pc_->ResetSctpDataMid();
diff --git a/pc/test/fake_data_channel_controller.h b/pc/test/fake_data_channel_controller.h
index 26ecc31..d1b41fc 100644
--- a/pc/test/fake_data_channel_controller.h
+++ b/pc/test/fake_data_channel_controller.h
@@ -29,23 +29,31 @@
         transport_available_(false),
         ready_to_send_(false),
         transport_error_(false) {}
-  virtual ~FakeDataChannelController() {}
+
+  ~FakeDataChannelController() override {
+    network_thread_->BlockingCall([&] {
+      RTC_DCHECK_RUN_ON(network_thread_);
+      weak_factory_.InvalidateWeakPtrs();
+    });
+  }
 
   rtc::WeakPtr<FakeDataChannelController> weak_ptr() {
+    RTC_DCHECK_RUN_ON(network_thread_);
     return weak_factory_.GetWeakPtr();
   }
 
   rtc::scoped_refptr<webrtc::SctpDataChannel> CreateDataChannel(
       absl::string_view label,
       webrtc::InternalDataChannelInit init) {
-    rtc::WeakPtr<FakeDataChannelController> my_weak_ptr = weak_ptr();
-    // Explicitly associate the weak ptr instance with the current thread to
-    // catch early any inappropriate referencing of it on the network thread.
-    RTC_CHECK(my_weak_ptr);
-
     rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
         network_thread_->BlockingCall([&]() {
           RTC_DCHECK_RUN_ON(network_thread_);
+          rtc::WeakPtr<FakeDataChannelController> my_weak_ptr = weak_ptr();
+          // Explicitly associate the weak ptr instance with the current thread
+          // to catch early any inappropriate referencing of it on the network
+          // thread.
+          RTC_CHECK(my_weak_ptr);
+
           rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
               webrtc::SctpDataChannel::Create(
                   std::move(my_weak_ptr), std::string(label),
@@ -54,17 +62,16 @@
           if (transport_available_ && channel->sid_n().HasValue()) {
             AddSctpDataStream(channel->sid_n());
           }
+          if (ready_to_send_) {
+            network_thread_->PostTask([channel = channel] {
+              if (channel->state() !=
+                  webrtc::DataChannelInterface::DataState::kClosed) {
+                channel->OnTransportReady();
+              }
+            });
+          }
           return channel;
         });
-    if (ready_to_send_) {
-      signaling_thread_->PostTask(
-          SafeTask(signaling_safety_.flag(), [channel = channel] {
-            if (channel->state() !=
-                webrtc::DataChannelInterface::DataState::kClosed) {
-              channel->OnTransportReady();
-            }
-          }));
-    }
     connected_channels_.insert(channel.get());
     return channel;
   }
@@ -72,6 +79,7 @@
   webrtc::RTCError SendData(webrtc::StreamId sid,
                             const webrtc::SendDataParams& params,
                             const rtc::CopyOnWriteBuffer& payload) override {
+    RTC_DCHECK_RUN_ON(network_thread_);
     RTC_CHECK(ready_to_send_);
     RTC_CHECK(transport_available_);
     if (send_blocked_) {
@@ -100,17 +108,14 @@
     RTC_DCHECK_RUN_ON(network_thread_);
     RTC_CHECK(sid.HasValue());
     known_stream_ids_.erase(sid);
-    signaling_thread_->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
-      // Unlike the real SCTP transport, act like the closing procedure finished
-      // instantly.
-      auto it = absl::c_find_if(connected_channels_, [&](const auto* c) {
-        return c->sid_s() == sid;
-      });
-      // This path mimics the DCC's OnChannelClosed handler since the FDCC
-      // (this class) doesn't have a transport that would do that.
-      if (it != connected_channels_.end())
-        (*it)->OnClosingProcedureComplete();
-    }));
+    // Unlike the real SCTP transport, act like the closing procedure finished
+    // instantly.
+    auto it = absl::c_find_if(connected_channels_,
+                              [&](const auto* c) { return c->sid_n() == sid; });
+    // This path mimics the DCC's OnChannelClosed handler since the FDCC
+    // (this class) doesn't have a transport that would do that.
+    if (it != connected_channels_.end())
+      (*it)->OnClosingProcedureComplete();
   }
 
   void OnChannelStateChanged(
@@ -126,36 +131,40 @@
 
   // Set true to emulate the SCTP stream being blocked by congestion control.
   void set_send_blocked(bool blocked) {
-    send_blocked_ = blocked;
-    if (!blocked) {
-      RTC_CHECK(transport_available_);
-      // Make a copy since `connected_channels_` may change while
-      // OnTransportReady is called.
-      auto copy = connected_channels_;
-      for (webrtc::SctpDataChannel* ch : copy) {
-        ch->OnTransportReady();
+    network_thread_->BlockingCall([&]() {
+      send_blocked_ = blocked;
+      if (!blocked) {
+        RTC_CHECK(transport_available_);
+        // Make a copy since `connected_channels_` may change while
+        // OnTransportReady is called.
+        auto copy = connected_channels_;
+        for (webrtc::SctpDataChannel* ch : copy) {
+          ch->OnTransportReady();
+        }
       }
-    }
+    });
   }
 
   // Set true to emulate the transport channel creation, e.g. after
   // setLocalDescription/setRemoteDescription called with data content.
   void set_transport_available(bool available) {
-    transport_available_ = available;
+    network_thread_->BlockingCall([&]() { transport_available_ = available; });
   }
 
   // Set true to emulate the transport OnTransportReady signal when the
   // transport becomes writable for the first time.
   void set_ready_to_send(bool ready) {
     RTC_CHECK(transport_available_);
-    ready_to_send_ = ready;
-    if (ready) {
-      std::set<webrtc::SctpDataChannel*>::iterator it;
-      for (it = connected_channels_.begin(); it != connected_channels_.end();
-           ++it) {
-        (*it)->OnTransportReady();
+    network_thread_->BlockingCall([&]() {
+      ready_to_send_ = ready;
+      if (ready) {
+        std::set<webrtc::SctpDataChannel*>::iterator it;
+        for (it = connected_channels_.begin(); it != connected_channels_.end();
+             ++it) {
+          (*it)->OnTransportReady();
+        }
       }
-    }
+    });
   }
 
   void set_transport_error() { transport_error_ = true; }