Inject signaling and network threads to DataChannel.

Add a few DCHECKs and comments about upcoming work.

Bug: webrtc:11547
Change-Id: I2d42f48cb93f31e70cf9fe4b3b62241c38bc9d8c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177106
Reviewed-by: Taylor <deadbeef@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31530}
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index e9ea742..9891d50 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -25,37 +25,10 @@
 bool DataChannelController::SendData(const cricket::SendDataParams& params,
                                      const rtc::CopyOnWriteBuffer& payload,
                                      cricket::SendDataResult* result) {
-  // RTC_DCHECK_RUN_ON(signaling_thread());
-  if (data_channel_transport()) {
-    SendDataParams send_params;
-    send_params.type = ToWebrtcDataMessageType(params.type);
-    send_params.ordered = params.ordered;
-    if (params.max_rtx_count >= 0) {
-      send_params.max_rtx_count = params.max_rtx_count;
-    } else if (params.max_rtx_ms >= 0) {
-      send_params.max_rtx_ms = params.max_rtx_ms;
-    }
-
-    RTCError error = network_thread()->Invoke<RTCError>(
-        RTC_FROM_HERE, [this, params, send_params, payload] {
-          return data_channel_transport()->SendData(params.sid, send_params,
-                                                    payload);
-        });
-
-    if (error.ok()) {
-      *result = cricket::SendDataResult::SDR_SUCCESS;
-      return true;
-    } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
-      // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
-      // TODO(mellem):  Stop using RTCError here and get rid of the mapping.
-      *result = cricket::SendDataResult::SDR_BLOCK;
-      return false;
-    }
-    *result = cricket::SendDataResult::SDR_ERROR;
-    return false;
-  } else if (rtp_data_channel()) {
+  if (data_channel_transport())
+    return DataChannelSendData(params, payload, result);
+  if (rtp_data_channel())
     return rtp_data_channel()->SendData(params, payload, result);
-  }
   RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
   return false;
 }
@@ -146,6 +119,14 @@
   data_channel_transport_invoker_->AsyncInvoke<void>(
       RTC_FROM_HERE, signaling_thread(), [this, params, buffer] {
         RTC_DCHECK_RUN_ON(signaling_thread());
+        // TODO(bugs.webrtc.org/11547): The data being received should be
+        // delivered on the network thread. The way HandleOpenMessage_s works
+        // right now is that it's called for all types of buffers and operates
+        // as a selector function. Change this so that it's only called for
+        // buffers that it should be able to handle. Once we do that, we can
+        // deliver all other buffers on the network thread (change
+        // SignalDataChannelTransportReceivedData_s to
+        // SignalDataChannelTransportReceivedData_n).
         if (!HandleOpenMessage_s(params, buffer)) {
           SignalDataChannelTransportReceivedData_s(params, buffer);
         }
@@ -261,6 +242,7 @@
     return;
   }
 
+  // TODO(bugs.webrtc.org/11547): Inject the network thread as well.
   rtc::scoped_refptr<DataChannelInterface> proxy_channel =
       DataChannelProxy::Create(signaling_thread(), channel);
   pc_->Observer()->OnDataChannel(std::move(proxy_channel));
@@ -299,7 +281,8 @@
   }
 
   rtc::scoped_refptr<DataChannel> channel(
-      DataChannel::Create(this, data_channel_type(), label, new_config));
+      DataChannel::Create(this, data_channel_type(), label, new_config,
+                          signaling_thread(), network_thread()));
   if (!channel) {
     sid_allocator_.ReleaseSid(new_config.id);
     return nullptr;
@@ -424,9 +407,10 @@
 
 void DataChannelController::UpdateRemoteRtpDataChannels(
     const cricket::StreamParamsVec& streams) {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+
   std::vector<std::string> existing_channels;
 
-  RTC_DCHECK_RUN_ON(signaling_thread());
   // Find new and active data channels.
   for (const cricket::StreamParams& params : streams) {
     // The data channel label is either the mslabel or the SSRC if the mslabel
@@ -447,6 +431,44 @@
   UpdateClosingRtpDataChannels(existing_channels, false);
 }
 
+cricket::DataChannelType DataChannelController::data_channel_type() const {
+  // TODO(bugs.webrtc.org/9987): Should be restricted to the signaling thread.
+  // RTC_DCHECK_RUN_ON(signaling_thread());
+  return data_channel_type_;
+}
+
+void DataChannelController::set_data_channel_type(
+    cricket::DataChannelType type) {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  data_channel_type_ = type;
+}
+
+DataChannelTransportInterface* DataChannelController::data_channel_transport()
+    const {
+  // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
+  // network thread.
+  // RTC_DCHECK_RUN_ON(network_thread());
+  return data_channel_transport_;
+}
+
+void DataChannelController::set_data_channel_transport(
+    DataChannelTransportInterface* transport) {
+  RTC_DCHECK_RUN_ON(network_thread());
+  data_channel_transport_ = transport;
+}
+
+const std::map<std::string, rtc::scoped_refptr<DataChannel>>*
+DataChannelController::rtp_data_channels() const {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return &rtp_data_channels_;
+}
+
+const std::vector<rtc::scoped_refptr<DataChannel>>*
+DataChannelController::sctp_data_channels() const {
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  return &sctp_data_channels_;
+}
+
 void DataChannelController::UpdateClosingRtpDataChannels(
     const std::vector<std::string>& active_channels,
     bool is_local_update) {
@@ -483,11 +505,50 @@
     return;
   }
   channel->SetReceiveSsrc(remote_ssrc);
+  // TODO(bugs.webrtc.org/11547): Inject the network thread as well.
   rtc::scoped_refptr<DataChannelInterface> proxy_channel =
       DataChannelProxy::Create(signaling_thread(), channel);
   pc_->Observer()->OnDataChannel(std::move(proxy_channel));
 }
 
+bool DataChannelController::DataChannelSendData(
+    const cricket::SendDataParams& params,
+    const rtc::CopyOnWriteBuffer& payload,
+    cricket::SendDataResult* result) {
+  // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
+  // thread instead. Remove the Invoke() below and move assocated state to
+  // the network thread.
+  RTC_DCHECK_RUN_ON(signaling_thread());
+  RTC_DCHECK(data_channel_transport());
+
+  SendDataParams send_params;
+  send_params.type = ToWebrtcDataMessageType(params.type);
+  send_params.ordered = params.ordered;
+  if (params.max_rtx_count >= 0) {
+    send_params.max_rtx_count = params.max_rtx_count;
+  } else if (params.max_rtx_ms >= 0) {
+    send_params.max_rtx_ms = params.max_rtx_ms;
+  }
+
+  RTCError error = network_thread()->Invoke<RTCError>(
+      RTC_FROM_HERE, [this, params, send_params, payload] {
+        return data_channel_transport()->SendData(params.sid, send_params,
+                                                  payload);
+      });
+
+  if (error.ok()) {
+    *result = cricket::SendDataResult::SDR_SUCCESS;
+    return true;
+  } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
+    // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
+    // TODO(mellem):  Stop using RTCError here and get rid of the mapping.
+    *result = cricket::SendDataResult::SDR_BLOCK;
+    return false;
+  }
+  *result = cricket::SendDataResult::SDR_ERROR;
+  return false;
+}
+
 rtc::Thread* DataChannelController::network_thread() const {
   return pc_->network_thread();
 }