Inject signaling and network threads to DataChannel.
Add a few DCHECKs and comments about upcoming work.
Bug: webrtc:11547
Change-Id: I2d42f48cb93f31e70cf9fe4b3b62241c38bc9d8c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177106
Reviewed-by: Taylor <deadbeef@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31530}
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index e9ea742..9891d50 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -25,37 +25,10 @@
bool DataChannelController::SendData(const cricket::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) {
- // RTC_DCHECK_RUN_ON(signaling_thread());
- if (data_channel_transport()) {
- SendDataParams send_params;
- send_params.type = ToWebrtcDataMessageType(params.type);
- send_params.ordered = params.ordered;
- if (params.max_rtx_count >= 0) {
- send_params.max_rtx_count = params.max_rtx_count;
- } else if (params.max_rtx_ms >= 0) {
- send_params.max_rtx_ms = params.max_rtx_ms;
- }
-
- RTCError error = network_thread()->Invoke<RTCError>(
- RTC_FROM_HERE, [this, params, send_params, payload] {
- return data_channel_transport()->SendData(params.sid, send_params,
- payload);
- });
-
- if (error.ok()) {
- *result = cricket::SendDataResult::SDR_SUCCESS;
- return true;
- } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
- // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
- // TODO(mellem): Stop using RTCError here and get rid of the mapping.
- *result = cricket::SendDataResult::SDR_BLOCK;
- return false;
- }
- *result = cricket::SendDataResult::SDR_ERROR;
- return false;
- } else if (rtp_data_channel()) {
+ if (data_channel_transport())
+ return DataChannelSendData(params, payload, result);
+ if (rtp_data_channel())
return rtp_data_channel()->SendData(params, payload, result);
- }
RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
return false;
}
@@ -146,6 +119,14 @@
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this, params, buffer] {
RTC_DCHECK_RUN_ON(signaling_thread());
+ // TODO(bugs.webrtc.org/11547): The data being received should be
+ // delivered on the network thread. The way HandleOpenMessage_s works
+ // right now is that it's called for all types of buffers and operates
+ // as a selector function. Change this so that it's only called for
+ // buffers that it should be able to handle. Once we do that, we can
+ // deliver all other buffers on the network thread (change
+ // SignalDataChannelTransportReceivedData_s to
+ // SignalDataChannelTransportReceivedData_n).
if (!HandleOpenMessage_s(params, buffer)) {
SignalDataChannelTransportReceivedData_s(params, buffer);
}
@@ -261,6 +242,7 @@
return;
}
+ // TODO(bugs.webrtc.org/11547): Inject the network thread as well.
rtc::scoped_refptr<DataChannelInterface> proxy_channel =
DataChannelProxy::Create(signaling_thread(), channel);
pc_->Observer()->OnDataChannel(std::move(proxy_channel));
@@ -299,7 +281,8 @@
}
rtc::scoped_refptr<DataChannel> channel(
- DataChannel::Create(this, data_channel_type(), label, new_config));
+ DataChannel::Create(this, data_channel_type(), label, new_config,
+ signaling_thread(), network_thread()));
if (!channel) {
sid_allocator_.ReleaseSid(new_config.id);
return nullptr;
@@ -424,9 +407,10 @@
void DataChannelController::UpdateRemoteRtpDataChannels(
const cricket::StreamParamsVec& streams) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+
std::vector<std::string> existing_channels;
- RTC_DCHECK_RUN_ON(signaling_thread());
// Find new and active data channels.
for (const cricket::StreamParams& params : streams) {
// The data channel label is either the mslabel or the SSRC if the mslabel
@@ -447,6 +431,44 @@
UpdateClosingRtpDataChannels(existing_channels, false);
}
+cricket::DataChannelType DataChannelController::data_channel_type() const {
+ // TODO(bugs.webrtc.org/9987): Should be restricted to the signaling thread.
+ // RTC_DCHECK_RUN_ON(signaling_thread());
+ return data_channel_type_;
+}
+
+void DataChannelController::set_data_channel_type(
+ cricket::DataChannelType type) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ data_channel_type_ = type;
+}
+
+DataChannelTransportInterface* DataChannelController::data_channel_transport()
+ const {
+ // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
+ // network thread.
+ // RTC_DCHECK_RUN_ON(network_thread());
+ return data_channel_transport_;
+}
+
+void DataChannelController::set_data_channel_transport(
+ DataChannelTransportInterface* transport) {
+ RTC_DCHECK_RUN_ON(network_thread());
+ data_channel_transport_ = transport;
+}
+
+const std::map<std::string, rtc::scoped_refptr<DataChannel>>*
+DataChannelController::rtp_data_channels() const {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ return &rtp_data_channels_;
+}
+
+const std::vector<rtc::scoped_refptr<DataChannel>>*
+DataChannelController::sctp_data_channels() const {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ return &sctp_data_channels_;
+}
+
void DataChannelController::UpdateClosingRtpDataChannels(
const std::vector<std::string>& active_channels,
bool is_local_update) {
@@ -483,11 +505,50 @@
return;
}
channel->SetReceiveSsrc(remote_ssrc);
+ // TODO(bugs.webrtc.org/11547): Inject the network thread as well.
rtc::scoped_refptr<DataChannelInterface> proxy_channel =
DataChannelProxy::Create(signaling_thread(), channel);
pc_->Observer()->OnDataChannel(std::move(proxy_channel));
}
+bool DataChannelController::DataChannelSendData(
+ const cricket::SendDataParams& params,
+ const rtc::CopyOnWriteBuffer& payload,
+ cricket::SendDataResult* result) {
+ // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
+ // thread instead. Remove the Invoke() below and move assocated state to
+ // the network thread.
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ RTC_DCHECK(data_channel_transport());
+
+ SendDataParams send_params;
+ send_params.type = ToWebrtcDataMessageType(params.type);
+ send_params.ordered = params.ordered;
+ if (params.max_rtx_count >= 0) {
+ send_params.max_rtx_count = params.max_rtx_count;
+ } else if (params.max_rtx_ms >= 0) {
+ send_params.max_rtx_ms = params.max_rtx_ms;
+ }
+
+ RTCError error = network_thread()->Invoke<RTCError>(
+ RTC_FROM_HERE, [this, params, send_params, payload] {
+ return data_channel_transport()->SendData(params.sid, send_params,
+ payload);
+ });
+
+ if (error.ok()) {
+ *result = cricket::SendDataResult::SDR_SUCCESS;
+ return true;
+ } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
+ // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
+ // TODO(mellem): Stop using RTCError here and get rid of the mapping.
+ *result = cricket::SendDataResult::SDR_BLOCK;
+ return false;
+ }
+ *result = cricket::SendDataResult::SDR_ERROR;
+ return false;
+}
+
rtc::Thread* DataChannelController::network_thread() const {
return pc_->network_thread();
}