[DataChannelController] Associate two methods with the network thread
Make DataChannelController's AddSctpDataStream and
RemoveSctpDataStream be required to be called on the network thread.
This moves blocking calls within those methods over to the
SctpDataChannel class instead.
For production code there's no functional change in this CL. However, this CL:
1) Introduces an actual dedicated network thread to
DataChannelController and SctpDataChannel tests.
2) Removes two data_channel_transport() checks inside DCC that
were being done on the wrong thread (signaling) and
3) introduces a network calling block to SctpDataChannel, where more
network thread related work needs to be done and can be bundled.
(to be done in follow-up CLs).
Bug: webrtc:11547
Change-Id: I6787ac395e61d4a25ae3a74a123e3357cbb46b54
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/298052
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39688}
diff --git a/pc/BUILD.gn b/pc/BUILD.gn
index 383e746..ce6896a 100644
--- a/pc/BUILD.gn
+++ b/pc/BUILD.gn
@@ -2461,6 +2461,7 @@
"../rtc_base:net_helper",
"../rtc_base:network",
"../rtc_base:network_constants",
+ "../rtc_base:null_socket_server",
"../rtc_base:refcount",
"../rtc_base:rtc_base_tests_utils",
"../rtc_base:rtc_certificate_generator",
@@ -2562,6 +2563,7 @@
":pc_test_utils",
":peer_connection_internal",
":sctp_data_channel",
+ "../rtc_base:null_socket_server",
"../test:run_loop",
"../test:test_support",
]
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index 275c82c..c6139a9 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -44,22 +44,16 @@
}
void DataChannelController::AddSctpDataStream(StreamId sid) {
+ RTC_DCHECK_RUN_ON(network_thread());
if (data_channel_transport()) {
- network_thread()->BlockingCall([this, sid] {
- if (data_channel_transport()) {
- data_channel_transport()->OpenChannel(sid.stream_id_int());
- }
- });
+ data_channel_transport()->OpenChannel(sid.stream_id_int());
}
}
void DataChannelController::RemoveSctpDataStream(StreamId sid) {
+ RTC_DCHECK_RUN_ON(network_thread());
if (data_channel_transport()) {
- network_thread()->BlockingCall([this, sid] {
- if (data_channel_transport()) {
- data_channel_transport()->CloseChannel(sid.stream_id_int());
- }
- });
+ data_channel_transport()->CloseChannel(sid.stream_id_int());
}
}
diff --git a/pc/data_channel_controller_unittest.cc b/pc/data_channel_controller_unittest.cc
index f5575b7..f4bcbfc 100644
--- a/pc/data_channel_controller_unittest.cc
+++ b/pc/data_channel_controller_unittest.cc
@@ -15,6 +15,7 @@
#include "pc/peer_connection_internal.h"
#include "pc/sctp_data_channel.h"
#include "pc/test/mock_peer_connection_internal.h"
+#include "rtc_base/null_socket_server.h"
#include "test/gmock.h"
#include "test/gtest.h"
#include "test/run_loop.h"
@@ -44,17 +45,22 @@
class DataChannelControllerTest : public ::testing::Test {
protected:
- DataChannelControllerTest() {
+ DataChannelControllerTest()
+ : network_thread_(std::make_unique<rtc::NullSocketServer>()) {
+ network_thread_.Start();
pc_ = rtc::make_ref_counted<NiceMock<MockPeerConnectionInternal>>();
ON_CALL(*pc_, signaling_thread)
.WillByDefault(Return(rtc::Thread::Current()));
- // TODO(tommi): Return a dedicated thread.
- ON_CALL(*pc_, network_thread).WillByDefault(Return(rtc::Thread::Current()));
+ ON_CALL(*pc_, network_thread).WillByDefault(Return(&network_thread_));
}
- ~DataChannelControllerTest() override { run_loop_.Flush(); }
+ ~DataChannelControllerTest() override {
+ run_loop_.Flush();
+ network_thread_.Stop();
+ }
test::RunLoop run_loop_;
+ rtc::Thread network_thread_;
rtc::scoped_refptr<NiceMock<MockPeerConnectionInternal>> pc_;
};
diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc
index f92c05c..dcf9fe4 100644
--- a/pc/data_channel_unittest.cc
+++ b/pc/data_channel_unittest.cc
@@ -26,9 +26,11 @@
#include "pc/test/fake_data_channel_controller.h"
#include "rtc_base/copy_on_write_buffer.h"
#include "rtc_base/gunit.h"
+#include "rtc_base/null_socket_server.h"
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/thread.h"
#include "test/gtest.h"
+#include "test/run_loop.h"
namespace webrtc {
@@ -71,12 +73,18 @@
size_t on_buffered_amount_change_count_;
};
-// TODO(bugs.webrtc.org/11547): Incorporate a dedicated network thread.
class SctpDataChannelTest : public ::testing::Test {
protected:
SctpDataChannelTest()
- : controller_(new FakeDataChannelController()),
- webrtc_data_channel_(controller_->CreateDataChannel("test", init_)) {}
+ : network_thread_(std::make_unique<rtc::NullSocketServer>()),
+ controller_(new FakeDataChannelController(&network_thread_)),
+ webrtc_data_channel_(controller_->CreateDataChannel("test", init_)) {
+ network_thread_.Start();
+ }
+ ~SctpDataChannelTest() override {
+ run_loop_.Flush();
+ network_thread_.Stop();
+ }
void SetChannelReady() {
controller_->set_transport_available(true);
@@ -92,7 +100,8 @@
webrtc_data_channel_->RegisterObserver(observer_.get());
}
- rtc::AutoThread main_thread_;
+ test::RunLoop run_loop_;
+ rtc::Thread network_thread_;
InternalDataChannelInit init_;
std::unique_ptr<FakeDataChannelController> controller_;
std::unique_ptr<FakeDataChannelObserver> observer_;
@@ -154,6 +163,10 @@
// `Close()` should trigger two state changes, first `kClosing`, then
// `kClose`.
webrtc_data_channel_->Close();
+ // The (simulated) transport close notifications runs on the network thread
+ // and posts a completion notification to the signaling (current) thread.
+ // Allow that ooperation to complete before checking the state.
+ run_loop_.Flush();
EXPECT_EQ(DataChannelInterface::kClosed, webrtc_data_channel_->state());
EXPECT_EQ(observer_->on_state_change_count(), 3u);
EXPECT_TRUE(webrtc_data_channel_->error().ok());
diff --git a/pc/rtc_stats_collector_unittest.cc b/pc/rtc_stats_collector_unittest.cc
index 3b63f2f..5021d19 100644
--- a/pc/rtc_stats_collector_unittest.cc
+++ b/pc/rtc_stats_collector_unittest.cc
@@ -670,7 +670,8 @@
RTCStatsCollectorTest()
: pc_(rtc::make_ref_counted<FakePeerConnectionForStats>()),
stats_(new RTCStatsCollectorWrapper(pc_)),
- data_channel_controller_(new FakeDataChannelController()) {}
+ data_channel_controller_(
+ new FakeDataChannelController(pc_->network_thread())) {}
void ExpectReportContainsCertificateInfo(
const rtc::scoped_refptr<const RTCStatsReport>& report,
@@ -2122,8 +2123,7 @@
EXPECT_EQ(expected, report->Get("P")->cast_to<RTCPeerConnectionStats>());
}
- // TODO(bugs.webrtc.org/11547): Supply a separate network thread.
- FakeDataChannelController controller;
+ FakeDataChannelController controller(pc_->network_thread());
rtc::scoped_refptr<SctpDataChannel> dummy_channel_a = SctpDataChannel::Create(
controller.weak_ptr(), "DummyChannelA", false, InternalDataChannelInit(),
rtc::Thread::Current(), rtc::Thread::Current());
diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc
index 24efae6..2280174 100644
--- a/pc/sctp_data_channel.cc
+++ b/pc/sctp_data_channel.cc
@@ -36,7 +36,7 @@
}
// Define proxy for DataChannelInterface.
-BEGIN_PRIMARY_PROXY_MAP(DataChannel)
+BEGIN_PROXY_MAP(DataChannel)
PROXY_PRIMARY_THREAD_DESTRUCTOR()
PROXY_METHOD1(void, RegisterObserver, DataChannelObserver*)
PROXY_METHOD0(void, UnregisterObserver)
@@ -164,9 +164,11 @@
// static
rtc::scoped_refptr<DataChannelInterface> SctpDataChannel::CreateProxy(
rtc::scoped_refptr<SctpDataChannel> channel) {
- // TODO(bugs.webrtc.org/11547): incorporate the network thread in the proxy.
+ // Copy thread params to local variables before `std::move()`.
auto* signaling_thread = channel->signaling_thread_;
- return DataChannelProxy::Create(signaling_thread, std::move(channel));
+ auto* network_thread = channel->network_thread_;
+ return DataChannelProxy::Create(signaling_thread, network_thread,
+ std::move(channel));
}
SctpDataChannel::SctpDataChannel(
@@ -209,8 +211,9 @@
// Try to connect to the transport in case the transport channel already
// exists.
- if (id_.HasValue()) {
- controller_->AddSctpDataStream(id_);
+ if (id_.HasValue() && connected_to_transport_) {
+ network_thread_->BlockingCall(
+ [c = controller_.get(), sid = id_] { c->AddSctpDataStream(sid); });
}
}
@@ -349,7 +352,10 @@
RTC_DCHECK_EQ(state_, kConnecting);
id_ = sid;
- controller_->AddSctpDataStream(sid);
+ if (connected_to_transport_) {
+ network_thread_->BlockingCall(
+ [c = controller_.get(), sid] { c->AddSctpDataStream(sid); });
+ }
}
void SctpDataChannel::OnClosingProcedureStartedRemotely() {
@@ -385,8 +391,9 @@
// The sid may have been unassigned when controller_->ConnectDataChannel was
// done. So always add the streams even if connected_to_transport_ is true.
- if (id_.HasValue()) {
- controller_->AddSctpDataStream(id_);
+ if (id_.HasValue() && connected_to_transport_) {
+ network_thread_->BlockingCall(
+ [c = controller_.get(), sid = id_] { c->AddSctpDataStream(sid); });
}
}
@@ -563,7 +570,9 @@
// afterwards.
if (!started_closing_procedure_ && controller_ && id_.HasValue()) {
started_closing_procedure_ = true;
- controller_->RemoveSctpDataStream(id_);
+ network_thread_->BlockingCall([c = controller_.get(), sid = id_] {
+ c->RemoveSctpDataStream(sid);
+ });
}
}
} else {
diff --git a/pc/sctp_data_channel.h b/pc/sctp_data_channel.h
index 5217505..8d95cc4 100644
--- a/pc/sctp_data_channel.h
+++ b/pc/sctp_data_channel.h
@@ -38,6 +38,10 @@
class SctpDataChannel;
+// Interface that acts as a bridge from the data channel to the transport.
+// TODO(bugs.webrtc.org/11547): The transport operates on the network thread
+// and ultimately all the methods in this interface need to be invoked on the
+// network thread. Currently, some are called on the signaling thread.
class SctpDataChannelControllerInterface {
public:
// Sends the data to the transport.
@@ -45,9 +49,11 @@
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) = 0;
// Adds the data channel SID to the transport for SCTP.
+ // Note: Must be called on the network thread.
virtual void AddSctpDataStream(StreamId sid) = 0;
// Begins the closing procedure by sending an outgoing stream reset. Still
// need to wait for callbacks to tell when this completes.
+ // Note: Must be called on the network thread.
virtual void RemoveSctpDataStream(StreamId sid) = 0;
// Returns true if the transport channel is ready to send data.
virtual bool ReadyToSendData() const = 0;
diff --git a/pc/test/fake_data_channel_controller.h b/pc/test/fake_data_channel_controller.h
index c9775e0..8bb6cef 100644
--- a/pc/test/fake_data_channel_controller.h
+++ b/pc/test/fake_data_channel_controller.h
@@ -21,8 +21,10 @@
class FakeDataChannelController
: public webrtc::SctpDataChannelControllerInterface {
public:
- FakeDataChannelController()
- : send_blocked_(false),
+ explicit FakeDataChannelController(rtc::Thread* network_thread)
+ : signaling_thread_(rtc::Thread::Current()),
+ network_thread_(network_thread),
+ send_blocked_(false),
transport_available_(false),
ready_to_send_(false),
transport_error_(false) {}
@@ -34,15 +36,13 @@
rtc::scoped_refptr<webrtc::SctpDataChannel> CreateDataChannel(
absl::string_view label,
- webrtc::InternalDataChannelInit init,
- rtc::Thread* network_thread = rtc::Thread::Current()) {
- rtc::Thread* signaling_thread = rtc::Thread::Current();
+ webrtc::InternalDataChannelInit init) {
rtc::scoped_refptr<webrtc::SctpDataChannel> channel =
webrtc::SctpDataChannel::Create(weak_ptr(), std::string(label),
transport_available_, init,
- signaling_thread, network_thread);
+ signaling_thread_, network_thread_);
if (ReadyToSendData()) {
- signaling_thread->PostTask(
+ signaling_thread_->PostTask(
SafeTask(signaling_safety_.flag(), [channel = channel] {
if (channel->state() !=
webrtc::DataChannelInterface::DataState::kClosed) {
@@ -73,6 +73,7 @@
}
void AddSctpDataStream(webrtc::StreamId sid) override {
+ RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(sid.HasValue());
if (!transport_available_) {
return;
@@ -81,16 +82,19 @@
}
void RemoveSctpDataStream(webrtc::StreamId sid) override {
+ RTC_DCHECK_RUN_ON(network_thread_);
RTC_CHECK(sid.HasValue());
known_stream_ids_.erase(sid);
- // Unlike the real SCTP transport, act like the closing procedure finished
- // instantly, doing the same snapshot thing as below.
- auto it = absl::c_find_if(connected_channels_,
- [&](const auto* c) { return c->sid() == sid; });
- // This path mimics the DCC's OnChannelClosed handler since the FDCC
- // (this class) doesn't have a transport that would do that.
- if (it != connected_channels_.end())
- (*it)->OnClosingProcedureComplete();
+ signaling_thread_->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
+ // Unlike the real SCTP transport, act like the closing procedure finished
+ // instantly.
+ auto it = absl::c_find_if(connected_channels_,
+ [&](const auto* c) { return c->sid() == sid; });
+ // This path mimics the DCC's OnChannelClosed handler since the FDCC
+ // (this class) doesn't have a transport that would do that.
+ if (it != connected_channels_.end())
+ (*it)->OnClosingProcedureComplete();
+ }));
}
bool ReadyToSendData() const override { return ready_to_send_; }
@@ -159,6 +163,8 @@
int channels_closed() const { return channels_closed_; }
private:
+ rtc::Thread* const signaling_thread_;
+ rtc::Thread* const network_thread_;
int last_sid_;
webrtc::SendDataParams last_send_data_params_;
bool send_blocked_;
diff --git a/pc/test/fake_peer_connection_for_stats.h b/pc/test/fake_peer_connection_for_stats.h
index ca3aa5c..79be6a0 100644
--- a/pc/test/fake_peer_connection_for_stats.h
+++ b/pc/test/fake_peer_connection_for_stats.h
@@ -205,7 +205,8 @@
dependencies_(MakeDependencies()),
context_(ConnectionContext::Create(&dependencies_)),
local_streams_(StreamCollection::Create()),
- remote_streams_(StreamCollection::Create()) {}
+ remote_streams_(StreamCollection::Create()),
+ data_channel_controller_(network_thread_) {}
~FakePeerConnectionForStats() {
for (auto transceiver : transceivers_) {