Inject signaling and network threads to DataChannel.

Add a few DCHECKs and comments about upcoming work.

Bug: webrtc:11547
Change-Id: I2d42f48cb93f31e70cf9fe4b3b62241c38bc9d8c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177106
Reviewed-by: Taylor <deadbeef@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31530}
diff --git a/pc/data_channel.cc b/pc/data_channel.cc
index 795bf8d..e4f658c 100644
--- a/pc/data_channel.cc
+++ b/pc/data_channel.cc
@@ -137,9 +137,12 @@
     DataChannelProviderInterface* provider,
     cricket::DataChannelType dct,
     const std::string& label,
-    const InternalDataChannelInit& config) {
+    const InternalDataChannelInit& config,
+    rtc::Thread* signaling_thread,
+    rtc::Thread* network_thread) {
   rtc::scoped_refptr<DataChannel> channel(
-      new rtc::RefCountedObject<DataChannel>(config, provider, dct, label));
+      new rtc::RefCountedObject<DataChannel>(config, provider, dct, label,
+                                             signaling_thread, network_thread));
   if (!channel->Init()) {
     return nullptr;
   }
@@ -155,8 +158,12 @@
 DataChannel::DataChannel(const InternalDataChannelInit& config,
                          DataChannelProviderInterface* provider,
                          cricket::DataChannelType dct,
-                         const std::string& label)
-    : internal_id_(GenerateUniqueId()),
+                         const std::string& label,
+                         rtc::Thread* signaling_thread,
+                         rtc::Thread* network_thread)
+    : signaling_thread_(signaling_thread),
+      network_thread_(network_thread),
+      internal_id_(GenerateUniqueId()),
       label_(label),
       config_(config),
       observer_(nullptr),
@@ -174,9 +181,12 @@
       receive_ssrc_set_(false),
       writable_(false),
       send_ssrc_(0),
-      receive_ssrc_(0) {}
+      receive_ssrc_(0) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+}
 
 bool DataChannel::Init() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (data_channel_type_ == cricket::DCT_RTP) {
     if (config_.reliable || config_.id != -1 || config_.maxRetransmits ||
         config_.maxRetransmitTime) {
@@ -229,18 +239,23 @@
   return true;
 }
 
-DataChannel::~DataChannel() {}
+DataChannel::~DataChannel() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+}
 
 void DataChannel::RegisterObserver(DataChannelObserver* observer) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   observer_ = observer;
   DeliverQueuedReceivedData();
 }
 
 void DataChannel::UnregisterObserver() {
-  observer_ = NULL;
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  observer_ = nullptr;
 }
 
 bool DataChannel::reliable() const {
+  // May be called on any thread.
   if (data_channel_type_ == cricket::DCT_RTP) {
     return false;
   } else {
@@ -249,10 +264,12 @@
 }
 
 uint64_t DataChannel::buffered_amount() const {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   return buffered_amount_;
 }
 
 void DataChannel::Close() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (state_ == kClosed)
     return;
   send_ssrc_ = 0;
@@ -262,11 +279,42 @@
   UpdateState();
 }
 
+DataChannel::DataState DataChannel::state() const {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  return state_;
+}
+
 RTCError DataChannel::error() const {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   return error_;
 }
 
+uint32_t DataChannel::messages_sent() const {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  return messages_sent_;
+}
+
+uint64_t DataChannel::bytes_sent() const {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  return bytes_sent_;
+}
+
+uint32_t DataChannel::messages_received() const {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  return messages_received_;
+}
+
+uint64_t DataChannel::bytes_received() const {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  return bytes_received_;
+}
+
 bool DataChannel::Send(const DataBuffer& buffer) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
+  // thread. Bring buffer management etc to the network thread and keep the
+  // operational state management on the signaling thread.
+
   buffered_amount_ += buffer.size();
   if (state_ != kOpen) {
     return false;
@@ -306,6 +354,7 @@
 }
 
 void DataChannel::SetReceiveSsrc(uint32_t receive_ssrc) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP);
 
   if (receive_ssrc_set_) {
@@ -329,6 +378,7 @@
 }
 
 void DataChannel::OnClosingProcedureStartedRemotely(int sid) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (IsSctpLike(data_channel_type_) && sid == config_.id &&
       state_ != kClosing && state_ != kClosed) {
     // Don't bother sending queued data since the side that initiated the
@@ -345,6 +395,7 @@
 }
 
 void DataChannel::OnClosingProcedureComplete(int sid) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (IsSctpLike(data_channel_type_) && sid == config_.id) {
     // If the closing procedure is complete, we should have finished sending
     // all pending data and transitioned to kClosing already.
@@ -356,6 +407,7 @@
 }
 
 void DataChannel::OnTransportChannelCreated() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   RTC_DCHECK(IsSctpLike(data_channel_type_));
   if (!connected_to_provider_) {
     connected_to_provider_ = provider_->ConnectDataChannel(this);
@@ -385,6 +437,7 @@
 }
 
 void DataChannel::SetSendSsrc(uint32_t send_ssrc) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP);
   if (send_ssrc_set_) {
     return;
@@ -396,6 +449,7 @@
 
 void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params,
                                  const rtc::CopyOnWriteBuffer& payload) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (data_channel_type_ == cricket::DCT_RTP && params.ssrc != receive_ssrc_) {
     return;
   }
@@ -462,6 +516,8 @@
 }
 
 void DataChannel::OnChannelReady(bool writable) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+
   writable_ = writable;
   if (!writable) {
     return;
@@ -473,6 +529,8 @@
 }
 
 void DataChannel::CloseAbruptlyWithError(RTCError error) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+
   if (state_ == kClosed) {
     return;
   }
@@ -501,6 +559,7 @@
 }
 
 void DataChannel::UpdateState() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   // UpdateState determines what to do from a few state variables.  Include
   // all conditions required for each state transition here for
   // clarity. OnChannelReady(true) will send any queued data and then invoke
@@ -568,6 +627,7 @@
 }
 
 void DataChannel::SetState(DataState state) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (state_ == state) {
     return;
   }
@@ -584,6 +644,7 @@
 }
 
 void DataChannel::DisconnectFromProvider() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (!connected_to_provider_)
     return;
 
@@ -592,6 +653,7 @@
 }
 
 void DataChannel::DeliverQueuedReceivedData() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (!observer_) {
     return;
   }
@@ -605,6 +667,7 @@
 }
 
 void DataChannel::SendQueuedDataMessages() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   if (queued_send_data_.Empty()) {
     return;
   }
@@ -623,6 +686,7 @@
 
 bool DataChannel::SendDataMessage(const DataBuffer& buffer,
                                   bool queue_if_blocked) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   cricket::SendDataParams send_params;
 
   if (IsSctpLike(data_channel_type_)) {
@@ -681,6 +745,7 @@
 }
 
 bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   size_t start_buffered_amount = queued_send_data_.byte_count();
   if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) {
     RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
@@ -691,6 +756,7 @@
 }
 
 void DataChannel::SendQueuedControlMessages() {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   PacketQueue control_packets;
   control_packets.Swap(&queued_control_data_);
 
@@ -701,10 +767,12 @@
 }
 
 void DataChannel::QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
 }
 
 bool DataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
+  RTC_DCHECK_RUN_ON(signaling_thread_);
   bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
 
   RTC_DCHECK(IsSctpLike(data_channel_type_));
diff --git a/pc/data_channel.h b/pc/data_channel.h
index 1ee2679..e843250 100644
--- a/pc/data_channel.h
+++ b/pc/data_channel.h
@@ -117,47 +117,51 @@
       DataChannelProviderInterface* provider,
       cricket::DataChannelType dct,
       const std::string& label,
-      const InternalDataChannelInit& config);
+      const InternalDataChannelInit& config,
+      rtc::Thread* signaling_thread,
+      rtc::Thread* network_thread);
 
   static bool IsSctpLike(cricket::DataChannelType type);
 
-  virtual void RegisterObserver(DataChannelObserver* observer);
-  virtual void UnregisterObserver();
+  void RegisterObserver(DataChannelObserver* observer) override;
+  void UnregisterObserver() override;
 
-  virtual std::string label() const { return label_; }
-  virtual bool reliable() const;
-  virtual bool ordered() const { return config_.ordered; }
+  std::string label() const override { return label_; }
+  bool reliable() const override;
+  bool ordered() const override { return config_.ordered; }
   // Backwards compatible accessors
-  virtual uint16_t maxRetransmitTime() const {
+  uint16_t maxRetransmitTime() const override {
     return config_.maxRetransmitTime ? *config_.maxRetransmitTime
                                      : static_cast<uint16_t>(-1);
   }
-  virtual uint16_t maxRetransmits() const {
+  uint16_t maxRetransmits() const override {
     return config_.maxRetransmits ? *config_.maxRetransmits
                                   : static_cast<uint16_t>(-1);
   }
-  virtual absl::optional<int> maxPacketLifeTime() const {
+  absl::optional<int> maxPacketLifeTime() const override {
     return config_.maxRetransmitTime;
   }
-  virtual absl::optional<int> maxRetransmitsOpt() const {
+  absl::optional<int> maxRetransmitsOpt() const override {
     return config_.maxRetransmits;
   }
-  virtual std::string protocol() const { return config_.protocol; }
-  virtual bool negotiated() const { return config_.negotiated; }
-  virtual int id() const { return config_.id; }
-  virtual Priority priority() const {
+  std::string protocol() const override { return config_.protocol; }
+  bool negotiated() const override { return config_.negotiated; }
+  int id() const override { return config_.id; }
+  Priority priority() const override {
     return config_.priority ? *config_.priority : Priority::kLow;
   }
+
   virtual int internal_id() const { return internal_id_; }
-  virtual uint64_t buffered_amount() const;
-  virtual void Close();
-  virtual DataState state() const { return state_; }
-  virtual RTCError error() const;
-  virtual uint32_t messages_sent() const { return messages_sent_; }
-  virtual uint64_t bytes_sent() const { return bytes_sent_; }
-  virtual uint32_t messages_received() const { return messages_received_; }
-  virtual uint64_t bytes_received() const { return bytes_received_; }
-  virtual bool Send(const DataBuffer& buffer);
+
+  uint64_t buffered_amount() const override;
+  void Close() override;
+  DataState state() const override;
+  RTCError error() const override;
+  uint32_t messages_sent() const override;
+  uint64_t bytes_sent() const override;
+  uint32_t messages_received() const override;
+  uint64_t bytes_received() const override;
+  bool Send(const DataBuffer& buffer) override;
 
   // Close immediately, ignoring any queued data or closing procedure.
   // This is called for RTP data channels when SDP indicates a channel should
@@ -234,8 +238,10 @@
   DataChannel(const InternalDataChannelInit& config,
               DataChannelProviderInterface* client,
               cricket::DataChannelType dct,
-              const std::string& label);
-  virtual ~DataChannel();
+              const std::string& label,
+              rtc::Thread* signaling_thread,
+              rtc::Thread* network_thread);
+  ~DataChannel() override;
 
  private:
   // A packet queue which tracks the total queued bytes. Queued packets are
@@ -284,36 +290,38 @@
   void QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer);
   bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer);
 
+  rtc::Thread* const signaling_thread_;
+  rtc::Thread* const network_thread_;
   const int internal_id_;
   const std::string label_;
   const InternalDataChannelInit config_;
-  DataChannelObserver* observer_;
-  DataState state_;
-  RTCError error_;
-  uint32_t messages_sent_;
-  uint64_t bytes_sent_;
-  uint32_t messages_received_;
-  uint64_t bytes_received_;
+  DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_);
+  DataState state_ RTC_GUARDED_BY(signaling_thread_);
+  RTCError error_ RTC_GUARDED_BY(signaling_thread_);
+  uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_);
+  uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_);
+  uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_);
+  uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_);
   // Number of bytes of data that have been queued using Send(). Increased
   // before each transport send and decreased after each successful send.
-  uint64_t buffered_amount_;
+  uint64_t buffered_amount_ RTC_GUARDED_BY(signaling_thread_);
   const cricket::DataChannelType data_channel_type_;
-  DataChannelProviderInterface* provider_;
-  HandshakeState handshake_state_;
-  bool connected_to_provider_;
-  bool send_ssrc_set_;
-  bool receive_ssrc_set_;
-  bool writable_;
+  DataChannelProviderInterface* const provider_;
+  HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_);
+  bool connected_to_provider_ RTC_GUARDED_BY(signaling_thread_);
+  bool send_ssrc_set_ RTC_GUARDED_BY(signaling_thread_);
+  bool receive_ssrc_set_ RTC_GUARDED_BY(signaling_thread_);
+  bool writable_ RTC_GUARDED_BY(signaling_thread_);
   // Did we already start the graceful SCTP closing procedure?
-  bool started_closing_procedure_ = false;
-  uint32_t send_ssrc_;
-  uint32_t receive_ssrc_;
+  bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false;
+  uint32_t send_ssrc_ RTC_GUARDED_BY(signaling_thread_);
+  uint32_t receive_ssrc_ RTC_GUARDED_BY(signaling_thread_);
   // Control messages that always have to get sent out before any queued
   // data.
-  PacketQueue queued_control_data_;
-  PacketQueue queued_received_data_;
-  PacketQueue queued_send_data_;
-  rtc::AsyncInvoker invoker_;
+  PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_);
+  PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_);
+  PacketQueue queued_send_data_ RTC_GUARDED_BY(signaling_thread_);
+  rtc::AsyncInvoker invoker_ RTC_GUARDED_BY(signaling_thread_);
 };
 
 // Define proxy for DataChannelInterface.
@@ -341,6 +349,7 @@
 PROXY_CONSTMETHOD0(uint64_t, bytes_received)
 PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
 PROXY_METHOD0(void, Close)
+// TODO(bugs.webrtc.org/11547): Change to run on the network thread.
 PROXY_METHOD1(bool, Send, const DataBuffer&)
 END_PROXY_MAP()
 
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index e9ea742..9891d50 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -25,37 +25,10 @@
 bool DataChannelController::SendData(const cricket::SendDataParams& params,
                                      const rtc::CopyOnWriteBuffer& payload,
                                      cricket::SendDataResult* result) {
-  // RTC_DCHECK_RUN_ON(signaling_thread());
-  if (data_channel_transport()) {
-    SendDataParams send_params;
-    send_params.type = ToWebrtcDataMessageType(params.type);
-    send_params.ordered = params.ordered;
-    if (params.max_rtx_count >= 0) {
-      send_params.max_rtx_count = params.max_rtx_count;
-    } else if (params.max_rtx_ms >= 0) {
-      send_params.max_rtx_ms = params.max_rtx_ms;
-    }
-
-    RTCError error = network_thread()->Invoke<RTCError>(
-        RTC_FROM_HERE, [this, params, send_params, payload] {
-          return data_channel_transport()->SendData(params.sid, send_params,
-                                                    payload);
-        });
-
-    if (error.ok()) {
-      *result = cricket::SendDataResult::SDR_SUCCESS;
-      return true;
-    } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
-      // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
-      // TODO(mellem):  Stop using RTCError here and get rid of the mapping.
-      *result = cricket::SendDataResult::SDR_BLOCK;
-      return false;
-    }
-    *result = cricket::SendDataResult::SDR_ERROR;
-    return false;
-  } else if (rtp_data_channel()) {
+  if (data_channel_transport())
+    return DataChannelSendData(params, payload, result);
+  if (rtp_data_channel())
     return rtp_data_channel()->SendData(params, payload, result);
-  }
   RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
   return false;
 }
@@ -146,6 +119,14 @@
   data_channel_transport_invoker_->AsyncInvoke<void>(
       RTC_FROM_HERE, signaling_thread(), [this, params, buffer] {
         RTC_DCHECK_RUN_ON(signaling_thread());
+        // TODO(bugs.webrtc.org/11547): The data being received should be
+        // delivered on the network thread. The way HandleOpenMessage_s works
+        // right now is that it's called for all types of buffers and operates
+        // as a selector function. Change this so that it's only called for
+        // buffers that it should be able to handle. Once we do that, we can
+        // deliver all other buffers on the network thread (change
+        // SignalDataChannelTransportReceivedData_s to
+        // SignalDataChannelTransportReceivedData_n).
         if (!HandleOpenMessage_s(params, buffer)) {
           SignalDataChannelTransportReceivedData_s(params, buffer);
         }
@@ -261,6 +242,7 @@
     return;
   }
 
+  // TODO(bugs.webrtc.org/11547): Inject the network thread as well.
   rtc::scoped_refptr<DataChannelInterface> proxy_channel =
       DataChannelProxy::Create(signaling_thread(), channel);
   pc_->Observer()->OnDataChannel(std::move(proxy_channel));
@@ -299,7 +281,8 @@
   }
 
   rtc::scoped_refptr<DataChannel> channel(
-      DataChannel::Create(this, data_channel_type(), label, new_config));
+      DataChannel::Create(this, data_channel_type(), label, new_config,
+                          signaling_thread(), network_thread()));
   if (!channel) {
     sid_allocator_.ReleaseSid(new_config.id);
     return nullptr;
@@ -424,9 +407,10 @@
 
 void DataChannelController::UpdateRemoteRtpDataChannels(
     const cricket::StreamParamsVec& streams) {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+
   std::vector<std::string> existing_channels;
 
-  RTC_DCHECK_RUN_ON(signaling_thread());
   // Find new and active data channels.
   for (const cricket::StreamParams& params : streams) {
     // The data channel label is either the mslabel or the SSRC if the mslabel
@@ -447,6 +431,44 @@
   UpdateClosingRtpDataChannels(existing_channels, false);
 }
 
+cricket::DataChannelType DataChannelController::data_channel_type() const {
+  // TODO(bugs.webrtc.org/9987): Should be restricted to the signaling thread.
+  // RTC_DCHECK_RUN_ON(signaling_thread());
+  return data_channel_type_;
+}
+
+void DataChannelController::set_data_channel_type(
+    cricket::DataChannelType type) {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  data_channel_type_ = type;
+}
+
+DataChannelTransportInterface* DataChannelController::data_channel_transport()
+    const {
+  // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
+  // network thread.
+  // RTC_DCHECK_RUN_ON(network_thread());
+  return data_channel_transport_;
+}
+
+void DataChannelController::set_data_channel_transport(
+    DataChannelTransportInterface* transport) {
+  RTC_DCHECK_RUN_ON(network_thread());
+  data_channel_transport_ = transport;
+}
+
+const std::map<std::string, rtc::scoped_refptr<DataChannel>>*
+DataChannelController::rtp_data_channels() const {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return &rtp_data_channels_;
+}
+
+const std::vector<rtc::scoped_refptr<DataChannel>>*
+DataChannelController::sctp_data_channels() const {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return &sctp_data_channels_;
+}
+
 void DataChannelController::UpdateClosingRtpDataChannels(
     const std::vector<std::string>& active_channels,
     bool is_local_update) {
@@ -483,11 +505,50 @@
     return;
   }
   channel->SetReceiveSsrc(remote_ssrc);
+  // TODO(bugs.webrtc.org/11547): Inject the network thread as well.
   rtc::scoped_refptr<DataChannelInterface> proxy_channel =
       DataChannelProxy::Create(signaling_thread(), channel);
   pc_->Observer()->OnDataChannel(std::move(proxy_channel));
 }
 
+bool DataChannelController::DataChannelSendData(
+    const cricket::SendDataParams& params,
+    const rtc::CopyOnWriteBuffer& payload,
+    cricket::SendDataResult* result) {
+  // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
+  // thread instead. Remove the Invoke() below and move assocated state to
+  // the network thread.
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  RTC_DCHECK(data_channel_transport());
+
+  SendDataParams send_params;
+  send_params.type = ToWebrtcDataMessageType(params.type);
+  send_params.ordered = params.ordered;
+  if (params.max_rtx_count >= 0) {
+    send_params.max_rtx_count = params.max_rtx_count;
+  } else if (params.max_rtx_ms >= 0) {
+    send_params.max_rtx_ms = params.max_rtx_ms;
+  }
+
+  RTCError error = network_thread()->Invoke<RTCError>(
+      RTC_FROM_HERE, [this, params, send_params, payload] {
+        return data_channel_transport()->SendData(params.sid, send_params,
+                                                  payload);
+      });
+
+  if (error.ok()) {
+    *result = cricket::SendDataResult::SDR_SUCCESS;
+    return true;
+  } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
+    // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
+    // TODO(mellem):  Stop using RTCError here and get rid of the mapping.
+    *result = cricket::SendDataResult::SDR_BLOCK;
+    return false;
+  }
+  *result = cricket::SendDataResult::SDR_ERROR;
+  return false;
+}
+
 rtc::Thread* DataChannelController::network_thread() const {
   return pc_->network_thread();
 }
diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h
index 60bcbb3..156bbe5 100644
--- a/pc/data_channel_controller.h
+++ b/pc/data_channel_controller.h
@@ -89,34 +89,20 @@
   void UpdateRemoteRtpDataChannels(const cricket::StreamParamsVec& streams);
 
   // Accessors
-  cricket::DataChannelType data_channel_type() const {
-    return data_channel_type_;
-  }
-  void set_data_channel_type(cricket::DataChannelType type) {
-    data_channel_type_ = type;
-  }
+  cricket::DataChannelType data_channel_type() const;
+  void set_data_channel_type(cricket::DataChannelType type);
   cricket::RtpDataChannel* rtp_data_channel() const {
     return rtp_data_channel_;
   }
   void set_rtp_data_channel(cricket::RtpDataChannel* channel) {
     rtp_data_channel_ = channel;
   }
-  DataChannelTransportInterface* data_channel_transport() const {
-    return data_channel_transport_;
-  }
-  void set_data_channel_transport(DataChannelTransportInterface* transport) {
-    data_channel_transport_ = transport;
-  }
+  DataChannelTransportInterface* data_channel_transport() const;
+  void set_data_channel_transport(DataChannelTransportInterface* transport);
   const std::map<std::string, rtc::scoped_refptr<DataChannel>>*
-  rtp_data_channels() const {
-    RTC_DCHECK_RUN_ON(signaling_thread());
-    return &rtp_data_channels_;
-  }
+  rtp_data_channels() const;
   const std::vector<rtc::scoped_refptr<DataChannel>>* sctp_data_channels()
-      const {
-    RTC_DCHECK_RUN_ON(signaling_thread());
-    return &sctp_data_channels_;
-  }
+      const;
 
   sigslot::signal1<DataChannel*>& SignalDataChannelCreated() {
     RTC_DCHECK_RUN_ON(signaling_thread());
@@ -146,6 +132,11 @@
       const std::vector<std::string>& active_channels,
       bool is_local_update) RTC_RUN_ON(signaling_thread());
 
+  // Called from SendData when data_channel_transport() is true.
+  bool DataChannelSendData(const cricket::SendDataParams& params,
+                           const rtc::CopyOnWriteBuffer& payload,
+                           cricket::SendDataResult* result);
+
   rtc::Thread* network_thread() const;
   rtc::Thread* signaling_thread() const;
 
@@ -189,6 +180,8 @@
 
   // Signals from |data_channel_transport_|.  These are invoked on the
   // signaling thread.
+  // TODO(bugs.webrtc.org/11547): These '_s' signals likely all belong on the
+  // network thread.
   sigslot::signal1<bool> SignalDataChannelTransportWritable_s
       RTC_GUARDED_BY(signaling_thread());
   sigslot::signal2<const cricket::ReceiveDataParams&,
diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc
index b29be33..11dfcc4 100644
--- a/pc/data_channel_unittest.cc
+++ b/pc/data_channel_unittest.cc
@@ -64,6 +64,7 @@
 // TODO(deadbeef): The fact that these tests use a fake provider makes them not
 // too valuable. Should rewrite using the
 // peerconnection_datachannel_unittest.cc infrastructure.
+// TODO(bugs.webrtc.org/11547): Incorporate a dedicated network thread.
 class SctpDataChannelTest : public ::testing::Test {
  protected:
   SctpDataChannelTest()
@@ -71,7 +72,9 @@
         webrtc_data_channel_(DataChannel::Create(provider_.get(),
                                                  cricket::DCT_SCTP,
                                                  "test",
-                                                 init_)) {}
+                                                 init_,
+                                                 rtc::Thread::Current(),
+                                                 rtc::Thread::Current())) {}
 
   void SetChannelReady() {
     provider_->set_transport_available(true);
@@ -111,7 +114,8 @@
 TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) {
   provider_->set_transport_available(true);
   rtc::scoped_refptr<DataChannel> dc =
-      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init_);
+      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init_,
+                          rtc::Thread::Current(), rtc::Thread::Current());
 
   EXPECT_TRUE(provider_->IsConnected(dc.get()));
   // The sid is not set yet, so it should not have added the streams.
@@ -305,7 +309,8 @@
   webrtc::InternalDataChannelInit init;
   init.id = 1;
   rtc::scoped_refptr<DataChannel> dc =
-      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init);
+      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init,
+                          rtc::Thread::Current(), rtc::Thread::Current());
   EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, dc->state());
   EXPECT_TRUE_WAIT(webrtc::DataChannelInterface::kOpen == dc->state(), 1000);
 }
@@ -318,7 +323,8 @@
   init.id = 1;
   init.ordered = false;
   rtc::scoped_refptr<DataChannel> dc =
-      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init);
+      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init,
+                          rtc::Thread::Current(), rtc::Thread::Current());
 
   EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
 
@@ -348,7 +354,8 @@
   init.id = 1;
   init.ordered = false;
   rtc::scoped_refptr<DataChannel> dc =
-      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init);
+      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init,
+                          rtc::Thread::Current(), rtc::Thread::Current());
 
   EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
 
@@ -449,7 +456,8 @@
 
   SetChannelReady();
   rtc::scoped_refptr<DataChannel> dc =
-      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config);
+      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config,
+                          rtc::Thread::Current(), rtc::Thread::Current());
 
   EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
   EXPECT_EQ(0U, provider_->last_send_data_params().ssrc);
@@ -512,7 +520,8 @@
 
   SetChannelReady();
   rtc::scoped_refptr<DataChannel> dc =
-      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config);
+      DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config,
+                          rtc::Thread::Current(), rtc::Thread::Current());
 
   EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
 
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index e581ac0..76f87f2 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -2211,6 +2211,7 @@
     UpdateNegotiationNeeded();
   }
   NoteUsageEvent(UsageEvent::DATA_ADDED);
+  // TODO(bugs.webrtc.org/11547): Inject the network thread as well.
   return DataChannelProxy::Create(signaling_thread(), channel.get());
 }
 
@@ -6714,6 +6715,8 @@
     case cricket::DCT_RTP:
     default:
       RtpTransportInternal* rtp_transport = GetRtpTransport(mid);
+      // TODO(bugs.webrtc.org/9987): set_rtp_data_channel() should be called on
+      // the network thread like set_data_channel_transport is.
       data_channel_controller_.set_rtp_data_channel(
           channel_manager()->CreateRtpDataChannel(
               configuration_.media_config, rtp_transport, signaling_thread(),
diff --git a/pc/rtc_stats_collector_unittest.cc b/pc/rtc_stats_collector_unittest.cc
index e0965af..013965c 100644
--- a/pc/rtc_stats_collector_unittest.cc
+++ b/pc/rtc_stats_collector_unittest.cc
@@ -1398,11 +1398,14 @@
         report->Get("RTCPeerConnection")->cast_to<RTCPeerConnectionStats>());
   }
 
+  // TODO(bugs.webrtc.org/11547): Supply a separate network thread.
   rtc::scoped_refptr<DataChannel> dummy_channel_a = DataChannel::Create(
-      nullptr, cricket::DCT_NONE, "DummyChannelA", InternalDataChannelInit());
+      nullptr, cricket::DCT_NONE, "DummyChannelA", InternalDataChannelInit(),
+      rtc::Thread::Current(), rtc::Thread::Current());
   pc_->SignalDataChannelCreated()(dummy_channel_a.get());
   rtc::scoped_refptr<DataChannel> dummy_channel_b = DataChannel::Create(
-      nullptr, cricket::DCT_NONE, "DummyChannelB", InternalDataChannelInit());
+      nullptr, cricket::DCT_NONE, "DummyChannelB", InternalDataChannelInit(),
+      rtc::Thread::Current(), rtc::Thread::Current());
   pc_->SignalDataChannelCreated()(dummy_channel_b.get());
 
   dummy_channel_a->SignalOpened(dummy_channel_a.get());
diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h
index c639158..f459552 100644
--- a/pc/test/fake_peer_connection_for_stats.h
+++ b/pc/test/fake_peer_connection_for_stats.h
@@ -174,8 +174,10 @@
 
   void AddSctpDataChannel(const std::string& label,
                           const InternalDataChannelInit& init) {
-    AddSctpDataChannel(DataChannel::Create(&data_channel_provider_,
-                                           cricket::DCT_SCTP, label, init));
+    // TODO(bugs.webrtc.org/11547): Supply a separate network thread.
+    AddSctpDataChannel(DataChannel::Create(
+        &data_channel_provider_, cricket::DCT_SCTP, label, init,
+        rtc::Thread::Current(), rtc::Thread::Current()));
   }
 
   void AddSctpDataChannel(rtc::scoped_refptr<DataChannel> data_channel) {
diff --git a/pc/test/mock_data_channel.h b/pc/test/mock_data_channel.h
index 63f0e6c..bc5f94d 100644
--- a/pc/test/mock_data_channel.h
+++ b/pc/test/mock_data_channel.h
@@ -31,11 +31,15 @@
       uint64_t bytes_sent,
       uint32_t messages_received,
       uint64_t bytes_received,
-      const InternalDataChannelInit& config = InternalDataChannelInit())
+      const InternalDataChannelInit& config = InternalDataChannelInit(),
+      rtc::Thread* signaling_thread = rtc::Thread::Current(),
+      rtc::Thread* network_thread = rtc::Thread::Current())
       : rtc::RefCountedObject<DataChannel>(config,
                                            nullptr,
                                            cricket::DCT_NONE,
-                                           label) {
+                                           label,
+                                           signaling_thread,
+                                           network_thread) {
     EXPECT_CALL(*this, id()).WillRepeatedly(::testing::Return(id));
     EXPECT_CALL(*this, state()).WillRepeatedly(::testing::Return(state));
     EXPECT_CALL(*this, protocol()).WillRepeatedly(::testing::Return(protocol));