Inject signaling and network threads to DataChannel.
Add a few DCHECKs and comments about upcoming work.
Bug: webrtc:11547
Change-Id: I2d42f48cb93f31e70cf9fe4b3b62241c38bc9d8c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177106
Reviewed-by: Taylor <deadbeef@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31530}
diff --git a/pc/data_channel.cc b/pc/data_channel.cc
index 795bf8d..e4f658c 100644
--- a/pc/data_channel.cc
+++ b/pc/data_channel.cc
@@ -137,9 +137,12 @@
DataChannelProviderInterface* provider,
cricket::DataChannelType dct,
const std::string& label,
- const InternalDataChannelInit& config) {
+ const InternalDataChannelInit& config,
+ rtc::Thread* signaling_thread,
+ rtc::Thread* network_thread) {
rtc::scoped_refptr<DataChannel> channel(
- new rtc::RefCountedObject<DataChannel>(config, provider, dct, label));
+ new rtc::RefCountedObject<DataChannel>(config, provider, dct, label,
+ signaling_thread, network_thread));
if (!channel->Init()) {
return nullptr;
}
@@ -155,8 +158,12 @@
DataChannel::DataChannel(const InternalDataChannelInit& config,
DataChannelProviderInterface* provider,
cricket::DataChannelType dct,
- const std::string& label)
- : internal_id_(GenerateUniqueId()),
+ const std::string& label,
+ rtc::Thread* signaling_thread,
+ rtc::Thread* network_thread)
+ : signaling_thread_(signaling_thread),
+ network_thread_(network_thread),
+ internal_id_(GenerateUniqueId()),
label_(label),
config_(config),
observer_(nullptr),
@@ -174,9 +181,12 @@
receive_ssrc_set_(false),
writable_(false),
send_ssrc_(0),
- receive_ssrc_(0) {}
+ receive_ssrc_(0) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+}
bool DataChannel::Init() {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
if (data_channel_type_ == cricket::DCT_RTP) {
if (config_.reliable || config_.id != -1 || config_.maxRetransmits ||
config_.maxRetransmitTime) {
@@ -229,18 +239,23 @@
return true;
}
-DataChannel::~DataChannel() {}
+DataChannel::~DataChannel() {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+}
void DataChannel::RegisterObserver(DataChannelObserver* observer) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
observer_ = observer;
DeliverQueuedReceivedData();
}
void DataChannel::UnregisterObserver() {
- observer_ = NULL;
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+ observer_ = nullptr;
}
bool DataChannel::reliable() const {
+ // May be called on any thread.
if (data_channel_type_ == cricket::DCT_RTP) {
return false;
} else {
@@ -249,10 +264,12 @@
}
uint64_t DataChannel::buffered_amount() const {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
return buffered_amount_;
}
void DataChannel::Close() {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
if (state_ == kClosed)
return;
send_ssrc_ = 0;
@@ -262,11 +279,42 @@
UpdateState();
}
+DataChannel::DataState DataChannel::state() const {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+ return state_;
+}
+
RTCError DataChannel::error() const {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
return error_;
}
+uint32_t DataChannel::messages_sent() const {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+ return messages_sent_;
+}
+
+uint64_t DataChannel::bytes_sent() const {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+ return bytes_sent_;
+}
+
+uint32_t DataChannel::messages_received() const {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+ return messages_received_;
+}
+
+uint64_t DataChannel::bytes_received() const {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+ return bytes_received_;
+}
+
bool DataChannel::Send(const DataBuffer& buffer) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+ // TODO(bugs.webrtc.org/11547): Expect this method to be called on the network
+ // thread. Bring buffer management etc to the network thread and keep the
+ // operational state management on the signaling thread.
+
buffered_amount_ += buffer.size();
if (state_ != kOpen) {
return false;
@@ -306,6 +354,7 @@
}
void DataChannel::SetReceiveSsrc(uint32_t receive_ssrc) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP);
if (receive_ssrc_set_) {
@@ -329,6 +378,7 @@
}
void DataChannel::OnClosingProcedureStartedRemotely(int sid) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
if (IsSctpLike(data_channel_type_) && sid == config_.id &&
state_ != kClosing && state_ != kClosed) {
// Don't bother sending queued data since the side that initiated the
@@ -345,6 +395,7 @@
}
void DataChannel::OnClosingProcedureComplete(int sid) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
if (IsSctpLike(data_channel_type_) && sid == config_.id) {
// If the closing procedure is complete, we should have finished sending
// all pending data and transitioned to kClosing already.
@@ -356,6 +407,7 @@
}
void DataChannel::OnTransportChannelCreated() {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(IsSctpLike(data_channel_type_));
if (!connected_to_provider_) {
connected_to_provider_ = provider_->ConnectDataChannel(this);
@@ -385,6 +437,7 @@
}
void DataChannel::SetSendSsrc(uint32_t send_ssrc) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_DCHECK(data_channel_type_ == cricket::DCT_RTP);
if (send_ssrc_set_) {
return;
@@ -396,6 +449,7 @@
void DataChannel::OnDataReceived(const cricket::ReceiveDataParams& params,
const rtc::CopyOnWriteBuffer& payload) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
if (data_channel_type_ == cricket::DCT_RTP && params.ssrc != receive_ssrc_) {
return;
}
@@ -462,6 +516,8 @@
}
void DataChannel::OnChannelReady(bool writable) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+
writable_ = writable;
if (!writable) {
return;
@@ -473,6 +529,8 @@
}
void DataChannel::CloseAbruptlyWithError(RTCError error) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+
if (state_ == kClosed) {
return;
}
@@ -501,6 +559,7 @@
}
void DataChannel::UpdateState() {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
// UpdateState determines what to do from a few state variables. Include
// all conditions required for each state transition here for
// clarity. OnChannelReady(true) will send any queued data and then invoke
@@ -568,6 +627,7 @@
}
void DataChannel::SetState(DataState state) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
if (state_ == state) {
return;
}
@@ -584,6 +644,7 @@
}
void DataChannel::DisconnectFromProvider() {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
if (!connected_to_provider_)
return;
@@ -592,6 +653,7 @@
}
void DataChannel::DeliverQueuedReceivedData() {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
if (!observer_) {
return;
}
@@ -605,6 +667,7 @@
}
void DataChannel::SendQueuedDataMessages() {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
if (queued_send_data_.Empty()) {
return;
}
@@ -623,6 +686,7 @@
bool DataChannel::SendDataMessage(const DataBuffer& buffer,
bool queue_if_blocked) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
cricket::SendDataParams send_params;
if (IsSctpLike(data_channel_type_)) {
@@ -681,6 +745,7 @@
}
bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
size_t start_buffered_amount = queued_send_data_.byte_count();
if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) {
RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
@@ -691,6 +756,7 @@
}
void DataChannel::SendQueuedControlMessages() {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
PacketQueue control_packets;
control_packets.Swap(&queued_control_data_);
@@ -701,10 +767,12 @@
}
void DataChannel::QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
}
bool DataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
RTC_DCHECK(IsSctpLike(data_channel_type_));
diff --git a/pc/data_channel.h b/pc/data_channel.h
index 1ee2679..e843250 100644
--- a/pc/data_channel.h
+++ b/pc/data_channel.h
@@ -117,47 +117,51 @@
DataChannelProviderInterface* provider,
cricket::DataChannelType dct,
const std::string& label,
- const InternalDataChannelInit& config);
+ const InternalDataChannelInit& config,
+ rtc::Thread* signaling_thread,
+ rtc::Thread* network_thread);
static bool IsSctpLike(cricket::DataChannelType type);
- virtual void RegisterObserver(DataChannelObserver* observer);
- virtual void UnregisterObserver();
+ void RegisterObserver(DataChannelObserver* observer) override;
+ void UnregisterObserver() override;
- virtual std::string label() const { return label_; }
- virtual bool reliable() const;
- virtual bool ordered() const { return config_.ordered; }
+ std::string label() const override { return label_; }
+ bool reliable() const override;
+ bool ordered() const override { return config_.ordered; }
// Backwards compatible accessors
- virtual uint16_t maxRetransmitTime() const {
+ uint16_t maxRetransmitTime() const override {
return config_.maxRetransmitTime ? *config_.maxRetransmitTime
: static_cast<uint16_t>(-1);
}
- virtual uint16_t maxRetransmits() const {
+ uint16_t maxRetransmits() const override {
return config_.maxRetransmits ? *config_.maxRetransmits
: static_cast<uint16_t>(-1);
}
- virtual absl::optional<int> maxPacketLifeTime() const {
+ absl::optional<int> maxPacketLifeTime() const override {
return config_.maxRetransmitTime;
}
- virtual absl::optional<int> maxRetransmitsOpt() const {
+ absl::optional<int> maxRetransmitsOpt() const override {
return config_.maxRetransmits;
}
- virtual std::string protocol() const { return config_.protocol; }
- virtual bool negotiated() const { return config_.negotiated; }
- virtual int id() const { return config_.id; }
- virtual Priority priority() const {
+ std::string protocol() const override { return config_.protocol; }
+ bool negotiated() const override { return config_.negotiated; }
+ int id() const override { return config_.id; }
+ Priority priority() const override {
return config_.priority ? *config_.priority : Priority::kLow;
}
+
virtual int internal_id() const { return internal_id_; }
- virtual uint64_t buffered_amount() const;
- virtual void Close();
- virtual DataState state() const { return state_; }
- virtual RTCError error() const;
- virtual uint32_t messages_sent() const { return messages_sent_; }
- virtual uint64_t bytes_sent() const { return bytes_sent_; }
- virtual uint32_t messages_received() const { return messages_received_; }
- virtual uint64_t bytes_received() const { return bytes_received_; }
- virtual bool Send(const DataBuffer& buffer);
+
+ uint64_t buffered_amount() const override;
+ void Close() override;
+ DataState state() const override;
+ RTCError error() const override;
+ uint32_t messages_sent() const override;
+ uint64_t bytes_sent() const override;
+ uint32_t messages_received() const override;
+ uint64_t bytes_received() const override;
+ bool Send(const DataBuffer& buffer) override;
// Close immediately, ignoring any queued data or closing procedure.
// This is called for RTP data channels when SDP indicates a channel should
@@ -234,8 +238,10 @@
DataChannel(const InternalDataChannelInit& config,
DataChannelProviderInterface* client,
cricket::DataChannelType dct,
- const std::string& label);
- virtual ~DataChannel();
+ const std::string& label,
+ rtc::Thread* signaling_thread,
+ rtc::Thread* network_thread);
+ ~DataChannel() override;
private:
// A packet queue which tracks the total queued bytes. Queued packets are
@@ -284,36 +290,38 @@
void QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer);
bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer);
+ rtc::Thread* const signaling_thread_;
+ rtc::Thread* const network_thread_;
const int internal_id_;
const std::string label_;
const InternalDataChannelInit config_;
- DataChannelObserver* observer_;
- DataState state_;
- RTCError error_;
- uint32_t messages_sent_;
- uint64_t bytes_sent_;
- uint32_t messages_received_;
- uint64_t bytes_received_;
+ DataChannelObserver* observer_ RTC_GUARDED_BY(signaling_thread_);
+ DataState state_ RTC_GUARDED_BY(signaling_thread_);
+ RTCError error_ RTC_GUARDED_BY(signaling_thread_);
+ uint32_t messages_sent_ RTC_GUARDED_BY(signaling_thread_);
+ uint64_t bytes_sent_ RTC_GUARDED_BY(signaling_thread_);
+ uint32_t messages_received_ RTC_GUARDED_BY(signaling_thread_);
+ uint64_t bytes_received_ RTC_GUARDED_BY(signaling_thread_);
// Number of bytes of data that have been queued using Send(). Increased
// before each transport send and decreased after each successful send.
- uint64_t buffered_amount_;
+ uint64_t buffered_amount_ RTC_GUARDED_BY(signaling_thread_);
const cricket::DataChannelType data_channel_type_;
- DataChannelProviderInterface* provider_;
- HandshakeState handshake_state_;
- bool connected_to_provider_;
- bool send_ssrc_set_;
- bool receive_ssrc_set_;
- bool writable_;
+ DataChannelProviderInterface* const provider_;
+ HandshakeState handshake_state_ RTC_GUARDED_BY(signaling_thread_);
+ bool connected_to_provider_ RTC_GUARDED_BY(signaling_thread_);
+ bool send_ssrc_set_ RTC_GUARDED_BY(signaling_thread_);
+ bool receive_ssrc_set_ RTC_GUARDED_BY(signaling_thread_);
+ bool writable_ RTC_GUARDED_BY(signaling_thread_);
// Did we already start the graceful SCTP closing procedure?
- bool started_closing_procedure_ = false;
- uint32_t send_ssrc_;
- uint32_t receive_ssrc_;
+ bool started_closing_procedure_ RTC_GUARDED_BY(signaling_thread_) = false;
+ uint32_t send_ssrc_ RTC_GUARDED_BY(signaling_thread_);
+ uint32_t receive_ssrc_ RTC_GUARDED_BY(signaling_thread_);
// Control messages that always have to get sent out before any queued
// data.
- PacketQueue queued_control_data_;
- PacketQueue queued_received_data_;
- PacketQueue queued_send_data_;
- rtc::AsyncInvoker invoker_;
+ PacketQueue queued_control_data_ RTC_GUARDED_BY(signaling_thread_);
+ PacketQueue queued_received_data_ RTC_GUARDED_BY(signaling_thread_);
+ PacketQueue queued_send_data_ RTC_GUARDED_BY(signaling_thread_);
+ rtc::AsyncInvoker invoker_ RTC_GUARDED_BY(signaling_thread_);
};
// Define proxy for DataChannelInterface.
@@ -341,6 +349,7 @@
PROXY_CONSTMETHOD0(uint64_t, bytes_received)
PROXY_CONSTMETHOD0(uint64_t, buffered_amount)
PROXY_METHOD0(void, Close)
+// TODO(bugs.webrtc.org/11547): Change to run on the network thread.
PROXY_METHOD1(bool, Send, const DataBuffer&)
END_PROXY_MAP()
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index e9ea742..9891d50 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -25,37 +25,10 @@
bool DataChannelController::SendData(const cricket::SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload,
cricket::SendDataResult* result) {
- // RTC_DCHECK_RUN_ON(signaling_thread());
- if (data_channel_transport()) {
- SendDataParams send_params;
- send_params.type = ToWebrtcDataMessageType(params.type);
- send_params.ordered = params.ordered;
- if (params.max_rtx_count >= 0) {
- send_params.max_rtx_count = params.max_rtx_count;
- } else if (params.max_rtx_ms >= 0) {
- send_params.max_rtx_ms = params.max_rtx_ms;
- }
-
- RTCError error = network_thread()->Invoke<RTCError>(
- RTC_FROM_HERE, [this, params, send_params, payload] {
- return data_channel_transport()->SendData(params.sid, send_params,
- payload);
- });
-
- if (error.ok()) {
- *result = cricket::SendDataResult::SDR_SUCCESS;
- return true;
- } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
- // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
- // TODO(mellem): Stop using RTCError here and get rid of the mapping.
- *result = cricket::SendDataResult::SDR_BLOCK;
- return false;
- }
- *result = cricket::SendDataResult::SDR_ERROR;
- return false;
- } else if (rtp_data_channel()) {
+ if (data_channel_transport())
+ return DataChannelSendData(params, payload, result);
+ if (rtp_data_channel())
return rtp_data_channel()->SendData(params, payload, result);
- }
RTC_LOG(LS_ERROR) << "SendData called before transport is ready";
return false;
}
@@ -146,6 +119,14 @@
data_channel_transport_invoker_->AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread(), [this, params, buffer] {
RTC_DCHECK_RUN_ON(signaling_thread());
+ // TODO(bugs.webrtc.org/11547): The data being received should be
+ // delivered on the network thread. The way HandleOpenMessage_s works
+ // right now is that it's called for all types of buffers and operates
+ // as a selector function. Change this so that it's only called for
+ // buffers that it should be able to handle. Once we do that, we can
+ // deliver all other buffers on the network thread (change
+ // SignalDataChannelTransportReceivedData_s to
+ // SignalDataChannelTransportReceivedData_n).
if (!HandleOpenMessage_s(params, buffer)) {
SignalDataChannelTransportReceivedData_s(params, buffer);
}
@@ -261,6 +242,7 @@
return;
}
+ // TODO(bugs.webrtc.org/11547): Inject the network thread as well.
rtc::scoped_refptr<DataChannelInterface> proxy_channel =
DataChannelProxy::Create(signaling_thread(), channel);
pc_->Observer()->OnDataChannel(std::move(proxy_channel));
@@ -299,7 +281,8 @@
}
rtc::scoped_refptr<DataChannel> channel(
- DataChannel::Create(this, data_channel_type(), label, new_config));
+ DataChannel::Create(this, data_channel_type(), label, new_config,
+ signaling_thread(), network_thread()));
if (!channel) {
sid_allocator_.ReleaseSid(new_config.id);
return nullptr;
@@ -424,9 +407,10 @@
void DataChannelController::UpdateRemoteRtpDataChannels(
const cricket::StreamParamsVec& streams) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+
std::vector<std::string> existing_channels;
- RTC_DCHECK_RUN_ON(signaling_thread());
// Find new and active data channels.
for (const cricket::StreamParams& params : streams) {
// The data channel label is either the mslabel or the SSRC if the mslabel
@@ -447,6 +431,44 @@
UpdateClosingRtpDataChannels(existing_channels, false);
}
+cricket::DataChannelType DataChannelController::data_channel_type() const {
+ // TODO(bugs.webrtc.org/9987): Should be restricted to the signaling thread.
+ // RTC_DCHECK_RUN_ON(signaling_thread());
+ return data_channel_type_;
+}
+
+void DataChannelController::set_data_channel_type(
+ cricket::DataChannelType type) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ data_channel_type_ = type;
+}
+
+DataChannelTransportInterface* DataChannelController::data_channel_transport()
+ const {
+ // TODO(bugs.webrtc.org/11547): Only allow this accessor to be called on the
+ // network thread.
+ // RTC_DCHECK_RUN_ON(network_thread());
+ return data_channel_transport_;
+}
+
+void DataChannelController::set_data_channel_transport(
+ DataChannelTransportInterface* transport) {
+ RTC_DCHECK_RUN_ON(network_thread());
+ data_channel_transport_ = transport;
+}
+
+const std::map<std::string, rtc::scoped_refptr<DataChannel>>*
+DataChannelController::rtp_data_channels() const {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ return &rtp_data_channels_;
+}
+
+const std::vector<rtc::scoped_refptr<DataChannel>>*
+DataChannelController::sctp_data_channels() const {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ return &sctp_data_channels_;
+}
+
void DataChannelController::UpdateClosingRtpDataChannels(
const std::vector<std::string>& active_channels,
bool is_local_update) {
@@ -483,11 +505,50 @@
return;
}
channel->SetReceiveSsrc(remote_ssrc);
+ // TODO(bugs.webrtc.org/11547): Inject the network thread as well.
rtc::scoped_refptr<DataChannelInterface> proxy_channel =
DataChannelProxy::Create(signaling_thread(), channel);
pc_->Observer()->OnDataChannel(std::move(proxy_channel));
}
+bool DataChannelController::DataChannelSendData(
+ const cricket::SendDataParams& params,
+ const rtc::CopyOnWriteBuffer& payload,
+ cricket::SendDataResult* result) {
+ // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
+ // thread instead. Remove the Invoke() below and move assocated state to
+ // the network thread.
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ RTC_DCHECK(data_channel_transport());
+
+ SendDataParams send_params;
+ send_params.type = ToWebrtcDataMessageType(params.type);
+ send_params.ordered = params.ordered;
+ if (params.max_rtx_count >= 0) {
+ send_params.max_rtx_count = params.max_rtx_count;
+ } else if (params.max_rtx_ms >= 0) {
+ send_params.max_rtx_ms = params.max_rtx_ms;
+ }
+
+ RTCError error = network_thread()->Invoke<RTCError>(
+ RTC_FROM_HERE, [this, params, send_params, payload] {
+ return data_channel_transport()->SendData(params.sid, send_params,
+ payload);
+ });
+
+ if (error.ok()) {
+ *result = cricket::SendDataResult::SDR_SUCCESS;
+ return true;
+ } else if (error.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
+ // SCTP transport uses RESOURCE_EXHAUSTED when it's blocked.
+ // TODO(mellem): Stop using RTCError here and get rid of the mapping.
+ *result = cricket::SendDataResult::SDR_BLOCK;
+ return false;
+ }
+ *result = cricket::SendDataResult::SDR_ERROR;
+ return false;
+}
+
rtc::Thread* DataChannelController::network_thread() const {
return pc_->network_thread();
}
diff --git a/pc/data_channel_controller.h b/pc/data_channel_controller.h
index 60bcbb3..156bbe5 100644
--- a/pc/data_channel_controller.h
+++ b/pc/data_channel_controller.h
@@ -89,34 +89,20 @@
void UpdateRemoteRtpDataChannels(const cricket::StreamParamsVec& streams);
// Accessors
- cricket::DataChannelType data_channel_type() const {
- return data_channel_type_;
- }
- void set_data_channel_type(cricket::DataChannelType type) {
- data_channel_type_ = type;
- }
+ cricket::DataChannelType data_channel_type() const;
+ void set_data_channel_type(cricket::DataChannelType type);
cricket::RtpDataChannel* rtp_data_channel() const {
return rtp_data_channel_;
}
void set_rtp_data_channel(cricket::RtpDataChannel* channel) {
rtp_data_channel_ = channel;
}
- DataChannelTransportInterface* data_channel_transport() const {
- return data_channel_transport_;
- }
- void set_data_channel_transport(DataChannelTransportInterface* transport) {
- data_channel_transport_ = transport;
- }
+ DataChannelTransportInterface* data_channel_transport() const;
+ void set_data_channel_transport(DataChannelTransportInterface* transport);
const std::map<std::string, rtc::scoped_refptr<DataChannel>>*
- rtp_data_channels() const {
- RTC_DCHECK_RUN_ON(signaling_thread());
- return &rtp_data_channels_;
- }
+ rtp_data_channels() const;
const std::vector<rtc::scoped_refptr<DataChannel>>* sctp_data_channels()
- const {
- RTC_DCHECK_RUN_ON(signaling_thread());
- return &sctp_data_channels_;
- }
+ const;
sigslot::signal1<DataChannel*>& SignalDataChannelCreated() {
RTC_DCHECK_RUN_ON(signaling_thread());
@@ -146,6 +132,11 @@
const std::vector<std::string>& active_channels,
bool is_local_update) RTC_RUN_ON(signaling_thread());
+ // Called from SendData when data_channel_transport() is true.
+ bool DataChannelSendData(const cricket::SendDataParams& params,
+ const rtc::CopyOnWriteBuffer& payload,
+ cricket::SendDataResult* result);
+
rtc::Thread* network_thread() const;
rtc::Thread* signaling_thread() const;
@@ -189,6 +180,8 @@
// Signals from |data_channel_transport_|. These are invoked on the
// signaling thread.
+ // TODO(bugs.webrtc.org/11547): These '_s' signals likely all belong on the
+ // network thread.
sigslot::signal1<bool> SignalDataChannelTransportWritable_s
RTC_GUARDED_BY(signaling_thread());
sigslot::signal2<const cricket::ReceiveDataParams&,
diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc
index b29be33..11dfcc4 100644
--- a/pc/data_channel_unittest.cc
+++ b/pc/data_channel_unittest.cc
@@ -64,6 +64,7 @@
// TODO(deadbeef): The fact that these tests use a fake provider makes them not
// too valuable. Should rewrite using the
// peerconnection_datachannel_unittest.cc infrastructure.
+// TODO(bugs.webrtc.org/11547): Incorporate a dedicated network thread.
class SctpDataChannelTest : public ::testing::Test {
protected:
SctpDataChannelTest()
@@ -71,7 +72,9 @@
webrtc_data_channel_(DataChannel::Create(provider_.get(),
cricket::DCT_SCTP,
"test",
- init_)) {}
+ init_,
+ rtc::Thread::Current(),
+ rtc::Thread::Current())) {}
void SetChannelReady() {
provider_->set_transport_available(true);
@@ -111,7 +114,8 @@
TEST_F(SctpDataChannelTest, ConnectedToTransportOnCreated) {
provider_->set_transport_available(true);
rtc::scoped_refptr<DataChannel> dc =
- DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init_);
+ DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init_,
+ rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_TRUE(provider_->IsConnected(dc.get()));
// The sid is not set yet, so it should not have added the streams.
@@ -305,7 +309,8 @@
webrtc::InternalDataChannelInit init;
init.id = 1;
rtc::scoped_refptr<DataChannel> dc =
- DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init);
+ DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init,
+ rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_EQ(webrtc::DataChannelInterface::kConnecting, dc->state());
EXPECT_TRUE_WAIT(webrtc::DataChannelInterface::kOpen == dc->state(), 1000);
}
@@ -318,7 +323,8 @@
init.id = 1;
init.ordered = false;
rtc::scoped_refptr<DataChannel> dc =
- DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init);
+ DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init,
+ rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
@@ -348,7 +354,8 @@
init.id = 1;
init.ordered = false;
rtc::scoped_refptr<DataChannel> dc =
- DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init);
+ DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", init,
+ rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
@@ -449,7 +456,8 @@
SetChannelReady();
rtc::scoped_refptr<DataChannel> dc =
- DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config);
+ DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config,
+ rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
EXPECT_EQ(0U, provider_->last_send_data_params().ssrc);
@@ -512,7 +520,8 @@
SetChannelReady();
rtc::scoped_refptr<DataChannel> dc =
- DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config);
+ DataChannel::Create(provider_.get(), cricket::DCT_SCTP, "test1", config,
+ rtc::Thread::Current(), rtc::Thread::Current());
EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen, dc->state(), 1000);
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index e581ac0..76f87f2 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -2211,6 +2211,7 @@
UpdateNegotiationNeeded();
}
NoteUsageEvent(UsageEvent::DATA_ADDED);
+ // TODO(bugs.webrtc.org/11547): Inject the network thread as well.
return DataChannelProxy::Create(signaling_thread(), channel.get());
}
@@ -6714,6 +6715,8 @@
case cricket::DCT_RTP:
default:
RtpTransportInternal* rtp_transport = GetRtpTransport(mid);
+ // TODO(bugs.webrtc.org/9987): set_rtp_data_channel() should be called on
+ // the network thread like set_data_channel_transport is.
data_channel_controller_.set_rtp_data_channel(
channel_manager()->CreateRtpDataChannel(
configuration_.media_config, rtp_transport, signaling_thread(),
diff --git a/pc/rtc_stats_collector_unittest.cc b/pc/rtc_stats_collector_unittest.cc
index e0965af..013965c 100644
--- a/pc/rtc_stats_collector_unittest.cc
+++ b/pc/rtc_stats_collector_unittest.cc
@@ -1398,11 +1398,14 @@
report->Get("RTCPeerConnection")->cast_to<RTCPeerConnectionStats>());
}
+ // TODO(bugs.webrtc.org/11547): Supply a separate network thread.
rtc::scoped_refptr<DataChannel> dummy_channel_a = DataChannel::Create(
- nullptr, cricket::DCT_NONE, "DummyChannelA", InternalDataChannelInit());
+ nullptr, cricket::DCT_NONE, "DummyChannelA", InternalDataChannelInit(),
+ rtc::Thread::Current(), rtc::Thread::Current());
pc_->SignalDataChannelCreated()(dummy_channel_a.get());
rtc::scoped_refptr<DataChannel> dummy_channel_b = DataChannel::Create(
- nullptr, cricket::DCT_NONE, "DummyChannelB", InternalDataChannelInit());
+ nullptr, cricket::DCT_NONE, "DummyChannelB", InternalDataChannelInit(),
+ rtc::Thread::Current(), rtc::Thread::Current());
pc_->SignalDataChannelCreated()(dummy_channel_b.get());
dummy_channel_a->SignalOpened(dummy_channel_a.get());
diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h
index c639158..f459552 100644
--- a/pc/test/fake_peer_connection_for_stats.h
+++ b/pc/test/fake_peer_connection_for_stats.h
@@ -174,8 +174,10 @@
void AddSctpDataChannel(const std::string& label,
const InternalDataChannelInit& init) {
- AddSctpDataChannel(DataChannel::Create(&data_channel_provider_,
- cricket::DCT_SCTP, label, init));
+ // TODO(bugs.webrtc.org/11547): Supply a separate network thread.
+ AddSctpDataChannel(DataChannel::Create(
+ &data_channel_provider_, cricket::DCT_SCTP, label, init,
+ rtc::Thread::Current(), rtc::Thread::Current()));
}
void AddSctpDataChannel(rtc::scoped_refptr<DataChannel> data_channel) {
diff --git a/pc/test/mock_data_channel.h b/pc/test/mock_data_channel.h
index 63f0e6c..bc5f94d 100644
--- a/pc/test/mock_data_channel.h
+++ b/pc/test/mock_data_channel.h
@@ -31,11 +31,15 @@
uint64_t bytes_sent,
uint32_t messages_received,
uint64_t bytes_received,
- const InternalDataChannelInit& config = InternalDataChannelInit())
+ const InternalDataChannelInit& config = InternalDataChannelInit(),
+ rtc::Thread* signaling_thread = rtc::Thread::Current(),
+ rtc::Thread* network_thread = rtc::Thread::Current())
: rtc::RefCountedObject<DataChannel>(config,
nullptr,
cricket::DCT_NONE,
- label) {
+ label,
+ signaling_thread,
+ network_thread) {
EXPECT_CALL(*this, id()).WillRepeatedly(::testing::Return(id));
EXPECT_CALL(*this, state()).WillRepeatedly(::testing::Return(state));
EXPECT_CALL(*this, protocol()).WillRepeatedly(::testing::Return(protocol));