Revert "[DataChannel] Send and receive packets on the network thread."
This reverts commit fe53fec24e02d2d644220f913c3f9ae596bbb2d9.
Reason for revert: Speculative revert, may be breaking downstream project
Original change's description:
> [DataChannel] Send and receive packets on the network thread.
>
> This updates sctp channels, including work that happens between the
> data channel controller and the transport, to run on the network
> thread. Previously all network traffic related to data channels was
> routed through the signaling thread before going to either the network
> thread or the caller's thread (e.g. js thread in chrome). Now the
> calls can go straight from the network thread to the JS thread with
> enabling a special flag on the observer (see below) and similarly
> calls to send data, involve 2 threads instead of 3.
>
> * Custom data channel observer adapter implementation that
> maintains compatibility with existing observer implementations in
> that notifications are delivered on the signaling thread.
> The adapter can be explicitly disabled for implementations that
> want to optimize the callback path and promise to not block the
> network thread.
> * Remove the signaling thread copy of data channels in the controller.
> * Remove several PostTask operations that were needed to keep things
> in sync (but the need has gone away).
> * Update tests for the controller to consistently call
> TeardownDataChannelTransport_n to match with production.
> * Update stats collectors (current and legacy) to fetch the data
> channel stats on the network thread where they're maintained.
> * Remove the AsyncChannelCloseTeardown test since the async teardown
> step has gone away.
> * Remove `sid_s` in the channel code since we only need the network
> state now.
> * For the custom observer support (with and without data adapter) and
> maintain compatibility with existing implementations, added a new
> proxy macro that allows an implementation to selectively provide
> its own implementation without being proxied. This is used for
> registering/unregistering a data channel observer.
> * Update the data channel proxy to map most methods to the network
> thread, avoiding the interim jump to the signaling thread.
> * Update a plethora of thread checkers from signaling to network.
>
> Bug: webrtc:11547
> Change-Id: Ib4cff1482e31c46008e187189a79e967389bc518
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299142
> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
> Reviewed-by: Henrik Boström <hbos@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#39760}
Bug: webrtc:11547
Change-Id: Id0d65594bf727ccea5c49093c942b09714d101ad
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/300341
Auto-Submit: Andrey Logvin <landrey@webrtc.org>
Owners-Override: Andrey Logvin <landrey@webrtc.org>
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39764}
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index 20d5fe9..36e8be1 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -29,15 +29,8 @@
}
bool DataChannelController::HasDataChannelsForTest() const {
- auto has_channels = [&] {
- RTC_DCHECK_RUN_ON(network_thread());
- return !sctp_data_channels_n_.empty();
- };
-
- if (network_thread()->IsCurrent())
- return has_channels();
-
- return network_thread()->BlockingCall(std::move(has_channels));
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ return !sctp_data_channels_.empty();
}
bool DataChannelController::HasUsedDataChannels() const {
@@ -73,15 +66,11 @@
void DataChannelController::OnChannelStateChanged(
SctpDataChannel* channel,
DataChannelInterface::DataState state) {
- RTC_DCHECK_RUN_ON(network_thread());
+ RTC_DCHECK_RUN_ON(signaling_thread());
if (state == DataChannelInterface::DataState::kClosed)
OnSctpDataChannelClosed(channel);
- signaling_thread()->PostTask(
- SafeTask(signaling_safety_.flag(),
- [this, channel_id = channel->internal_id(), state = state] {
- pc_->OnSctpDataChannelStateChanged(channel_id, state);
- }));
+ pc_->OnSctpDataChannelStateChanged(channel->internal_id(), state);
}
void DataChannelController::OnDataReceived(
@@ -93,22 +82,27 @@
if (HandleOpenMessage_n(channel_id, type, buffer))
return;
- auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
- return c->sid_n().stream_id_int() == channel_id;
- });
-
- if (it != sctp_data_channels_n_.end())
- (*it)->OnDataReceived(type, buffer);
+ signaling_thread()->PostTask(
+ SafeTask(signaling_safety_.flag(), [this, channel_id, type, buffer] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ // TODO(bugs.webrtc.org/11547): The data being received should be
+ // delivered on the network thread.
+ auto it = FindChannel(StreamId(channel_id));
+ if (it != sctp_data_channels_.end())
+ (*it)->OnDataReceived(type, buffer);
+ }));
}
void DataChannelController::OnChannelClosing(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
- auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
- return c->sid_n().stream_id_int() == channel_id;
- });
-
- if (it != sctp_data_channels_n_.end())
- (*it)->OnClosingProcedureStartedRemotely();
+ signaling_thread()->PostTask(
+ SafeTask(signaling_safety_.flag(), [this, channel_id] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ // TODO(bugs.webrtc.org/11547): Should run on the network thread.
+ auto it = FindChannel(StreamId(channel_id));
+ if (it != sctp_data_channels_.end())
+ (*it)->OnClosingProcedureStartedRemotely();
+ }));
}
void DataChannelController::OnChannelClosed(int channel_id) {
@@ -118,44 +112,48 @@
auto it = absl::c_find_if(sctp_data_channels_n_,
[&](const auto& c) { return c->sid_n() == sid; });
- if (it != sctp_data_channels_n_.end()) {
- rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
+ if (it != sctp_data_channels_n_.end())
sctp_data_channels_n_.erase(it);
- channel->OnClosingProcedureComplete();
- }
+
+ signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ auto it = FindChannel(sid);
+ // Remove the channel from our list, close it and free up resources.
+ if (it != sctp_data_channels_.end()) {
+ rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
+ // Note: this causes OnSctpDataChannelClosed() to not do anything
+ // when called from within `OnClosingProcedureComplete`.
+ sctp_data_channels_.erase(it);
+
+ channel->OnClosingProcedureComplete();
+ }
+ }));
}
void DataChannelController::OnReadyToSend() {
RTC_DCHECK_RUN_ON(network_thread());
- auto copy = sctp_data_channels_n_;
- for (const auto& channel : copy) {
- if (channel->sid_n().HasValue()) {
+ signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ auto copy = sctp_data_channels_;
+ for (const auto& channel : copy)
channel->OnTransportReady();
- } else {
- // This happens for role==SSL_SERVER channels when we get notified by
- // the transport *before* the SDP code calls `AllocateSctpSids` to
- // trigger assignment of sids. In this case OnTransportReady() will be
- // called from within `AllocateSctpSids` below.
- RTC_LOG(LS_INFO) << "OnReadyToSend: Still waiting for an id for channel.";
- }
- }
+ }));
}
void DataChannelController::OnTransportClosed(RTCError error) {
RTC_DCHECK_RUN_ON(network_thread());
- // This loop will close all data channels and trigger a callback to
- // `OnSctpDataChannelClosed` which will modify `sctp_data_channels_n_`, so
- // we create a local copy while we do the fan-out.
- auto copy = sctp_data_channels_n_;
- for (const auto& channel : copy)
- channel->OnTransportChannelClosed(error);
+ signaling_thread()->PostTask(
+ SafeTask(signaling_safety_.flag(), [this, error] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ OnTransportChannelClosed(error);
+ }));
}
void DataChannelController::SetupDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
// There's a new data channel transport. This needs to be signaled to the
- // `sctp_data_channels_n_` so that they can reopen and reconnect. This is
+ // `sctp_data_channels_` so that they can reopen and reconnect. This is
// necessary when bundling is applied.
NotifyDataChannelsOfTransportCreated();
}
@@ -167,12 +165,11 @@
void DataChannelController::TeardownDataChannelTransport_n() {
RTC_DCHECK_RUN_ON(network_thread());
- if (data_channel_transport_) {
- data_channel_transport_->SetDataSink(nullptr);
- set_data_channel_transport(nullptr);
+ if (data_channel_transport()) {
+ data_channel_transport()->SetDataSink(nullptr);
}
+ set_data_channel_transport(nullptr);
sctp_data_channels_n_.clear();
- weak_factory_.InvalidateWeakPtrs();
}
void DataChannelController::OnTransportChanged(
@@ -188,7 +185,7 @@
new_data_channel_transport->SetDataSink(this);
// There's a new data channel transport. This needs to be signaled to the
- // `sctp_data_channels_n_` so that they can reopen and reconnect. This is
+ // `sctp_data_channels_` so that they can reopen and reconnect. This is
// necessary when bundling is applied.
NotifyDataChannelsOfTransportCreated();
}
@@ -197,10 +194,10 @@
std::vector<DataChannelStats> DataChannelController::GetDataChannelStats()
const {
- RTC_DCHECK_RUN_ON(network_thread());
+ RTC_DCHECK_RUN_ON(signaling_thread());
std::vector<DataChannelStats> stats;
- stats.reserve(sctp_data_channels_n_.size());
- for (const auto& channel : sctp_data_channels_n_)
+ stats.reserve(sctp_data_channels_.size());
+ for (const auto& channel : sctp_data_channels_)
stats.push_back(channel->GetStats());
return stats;
}
@@ -222,38 +219,28 @@
<< channel_id;
} else {
config.open_handshake_role = InternalDataChannelInit::kAcker;
- auto channel_or_error = CreateDataChannel(label, config);
- if (channel_or_error.ok()) {
- signaling_thread()->PostTask(SafeTask(
- signaling_safety_.flag(),
- [this, channel = channel_or_error.MoveValue(),
- ready_to_send = data_channel_transport_->IsReadyToSend()] {
- RTC_DCHECK_RUN_ON(signaling_thread());
- OnDataChannelOpenMessage(std::move(channel), ready_to_send);
- }));
- } else {
- RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
- << ToString(channel_or_error.error().type());
- }
+ signaling_thread()->PostTask(
+ SafeTask(signaling_safety_.flag(),
+ [this, label = std::move(label), config = std::move(config)] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ OnDataChannelOpenMessage(label, config);
+ }));
}
return true;
}
void DataChannelController::OnDataChannelOpenMessage(
- rtc::scoped_refptr<SctpDataChannel> channel,
- bool ready_to_send) {
- has_used_data_channels_ = true;
- auto proxy = SctpDataChannel::CreateProxy(channel);
-
- pc_->Observer()->OnDataChannel(proxy);
- pc_->NoteDataAddedEvent();
-
- if (ready_to_send) {
- network_thread()->PostTask([channel = std::move(channel)] {
- if (channel->state() != DataChannelInterface::DataState::kClosed)
- channel->OnTransportReady();
- });
+ const std::string& label,
+ const InternalDataChannelInit& config) {
+ auto channel_or_error = InternalCreateDataChannelWithProxy(label, config);
+ if (!channel_or_error.ok()) {
+ RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
+ << ToString(channel_or_error.error().type());
+ return;
}
+
+ pc_->Observer()->OnDataChannel(channel_or_error.MoveValue());
+ pc_->NoteDataAddedEvent();
}
// RTC_RUN_ON(network_thread())
@@ -282,31 +269,6 @@
return RTCError::OK();
}
-// RTC_RUN_ON(network_thread())
-RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>>
-DataChannelController::CreateDataChannel(const std::string& label,
- InternalDataChannelInit& config) {
- StreamId sid(config.id);
- RTCError err = ReserveOrAllocateSid(sid, config.fallback_ssl_role);
- if (!err.ok())
- return err;
-
- // In case `sid` has changed. Update `config` accordingly.
- config.id = sid.stream_id_int();
-
- rtc::scoped_refptr<SctpDataChannel> channel = SctpDataChannel::Create(
- weak_factory_.GetWeakPtr(), label, data_channel_transport_ != nullptr,
- config, signaling_thread(), network_thread());
- RTC_DCHECK(channel);
- sctp_data_channels_n_.push_back(channel);
-
- // If we have an id already, notify the transport.
- if (sid.HasValue())
- AddSctpDataStream(sid);
-
- return channel;
-}
-
RTCErrorOr<rtc::scoped_refptr<DataChannelInterface>>
DataChannelController::InternalCreateDataChannelWithProxy(
const std::string& label,
@@ -321,25 +283,29 @@
bool ready_to_send = false;
InternalDataChannelInit new_config = config;
StreamId sid(new_config.id);
+ auto weak_ptr = weak_factory_.GetWeakPtr();
+ RTC_DCHECK(weak_ptr); // Associate with current thread.
auto ret = network_thread()->BlockingCall(
[&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> {
RTC_DCHECK_RUN_ON(network_thread());
- auto channel = CreateDataChannel(label, new_config);
- if (!channel.ok())
- return channel;
+ RTCError err = ReserveOrAllocateSid(sid, new_config.fallback_ssl_role);
+ if (!err.ok())
+ return err;
+
+ // In case `sid` has changed. Update `new_config` accordingly.
+ new_config.id = sid.stream_id_int();
ready_to_send =
data_channel_transport_ && data_channel_transport_->IsReadyToSend();
- if (ready_to_send) {
- // If the transport is ready to send because the initial channel
- // ready signal may have been sent before the DataChannel creation.
- // This has to be done async because the upper layer objects (e.g.
- // Chrome glue and WebKit) are not wired up properly until after
- // `InternalCreateDataChannelWithProxy` returns.
- network_thread()->PostTask([channel = channel.value()] {
- if (channel->state() != DataChannelInterface::DataState::kClosed)
- channel->OnTransportReady();
- });
- }
+
+ rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
+ std::move(weak_ptr), label, data_channel_transport_ != nullptr,
+ new_config, signaling_thread(), network_thread()));
+ RTC_DCHECK(channel);
+ sctp_data_channels_n_.push_back(channel);
+
+ // If we have an id already, notify the transport.
+ if (sid.HasValue())
+ AddSctpDataStream(sid);
return channel;
});
@@ -347,71 +313,114 @@
if (!ret.ok())
return ret.MoveError();
+ if (ready_to_send) {
+ // Checks if the transport is ready to send because the initial channel
+ // ready signal may have been sent before the DataChannel creation.
+ // This has to be done async because the upper layer objects (e.g.
+ // Chrome glue and WebKit) are not wired up properly until after this
+ // function returns.
+ signaling_thread()->PostTask(
+ SafeTask(signaling_safety_.flag(), [channel = ret.value()] {
+ if (channel->state() != DataChannelInterface::DataState::kClosed)
+ channel->OnTransportReady();
+ }));
+ }
+
+ sctp_data_channels_.push_back(ret.value());
has_used_data_channels_ = true;
return SctpDataChannel::CreateProxy(ret.MoveValue());
}
void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
- RTC_DCHECK_RUN_ON(network_thread());
-
- const bool ready_to_send =
- data_channel_transport_ && data_channel_transport_->IsReadyToSend();
+ RTC_DCHECK_RUN_ON(signaling_thread());
std::vector<std::pair<SctpDataChannel*, StreamId>> channels_to_update;
std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
- for (auto it = sctp_data_channels_n_.begin();
- it != sctp_data_channels_n_.end();) {
- if (!(*it)->sid_n().HasValue()) {
- StreamId sid = sid_allocator_.AllocateSid(role);
- if (sid.HasValue()) {
- (*it)->SetSctpSid_n(sid);
- AddSctpDataStream(sid);
- if (ready_to_send) {
- RTC_LOG(LS_INFO) << "AllocateSctpSids: Id assigned, ready to send.";
- (*it)->OnTransportReady();
+
+ network_thread()->BlockingCall([&] {
+ RTC_DCHECK_RUN_ON(network_thread());
+ for (auto it = sctp_data_channels_n_.begin();
+ it != sctp_data_channels_n_.end();) {
+ if (!(*it)->sid_n().HasValue()) {
+ StreamId sid = sid_allocator_.AllocateSid(role);
+ if (sid.HasValue()) {
+ (*it)->SetSctpSid_n(sid);
+ AddSctpDataStream(sid);
+ channels_to_update.push_back(std::make_pair((*it).get(), sid));
+ } else {
+ channels_to_close.push_back(std::move(*it));
+ it = sctp_data_channels_n_.erase(it);
+ continue;
}
- channels_to_update.push_back(std::make_pair((*it).get(), sid));
- } else {
- channels_to_close.push_back(std::move(*it));
- it = sctp_data_channels_n_.erase(it);
- continue;
}
+ ++it;
}
- ++it;
- }
+ });
// Since closing modifies the list of channels, we have to do the actual
// closing outside the loop.
for (const auto& channel : channels_to_close) {
channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
+ // The channel should now have been removed from sctp_data_channels_.
+ RTC_DCHECK(absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+ return c.get() == channel.get();
+ }) == sctp_data_channels_.end());
+ }
+
+ for (auto& pair : channels_to_update) {
+ auto it = absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+ return c.get() == pair.first;
+ });
+ RTC_DCHECK(it != sctp_data_channels_.end());
+ (*it)->SetSctpSid_s(pair.second);
}
}
void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
- RTC_DCHECK_RUN_ON(network_thread());
- // After the closing procedure is done, it's safe to use this ID for
- // another data channel.
- if (channel->sid_n().HasValue()) {
- sid_allocator_.ReleaseSid(channel->sid_n());
+ RTC_DCHECK_RUN_ON(signaling_thread());
+
+ network_thread()->BlockingCall([&] {
+ RTC_DCHECK_RUN_ON(network_thread());
+ // After the closing procedure is done, it's safe to use this ID for
+ // another data channel.
+ if (channel->sid_n().HasValue()) {
+ sid_allocator_.ReleaseSid(channel->sid_n());
+ }
+
+ auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
+ return c.get() == channel;
+ });
+
+ if (it != sctp_data_channels_n_.end())
+ sctp_data_channels_n_.erase(it);
+ });
+
+ for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
+ ++it) {
+ if (it->get() == channel) {
+ // Since this method is triggered by a signal from the DataChannel,
+ // we can't free it directly here; we need to free it asynchronously.
+ rtc::scoped_refptr<SctpDataChannel> release = std::move(*it);
+ sctp_data_channels_.erase(it);
+ signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(),
+ [release = std::move(release)] {}));
+ return;
+ }
}
- auto it = absl::c_find_if(sctp_data_channels_n_,
- [&](const auto& c) { return c.get() == channel; });
- if (it != sctp_data_channels_n_.end())
- sctp_data_channels_n_.erase(it);
}
void DataChannelController::OnTransportChannelClosed(RTCError error) {
- RTC_DCHECK_RUN_ON(network_thread());
+ RTC_DCHECK_RUN_ON(signaling_thread());
// Use a temporary copy of the SCTP DataChannel list because the
// DataChannel may callback to us and try to modify the list.
// TODO(tommi): `OnTransportChannelClosed` is called from
// `SdpOfferAnswerHandler::DestroyDataChannelTransport` just before
// `TeardownDataChannelTransport_n` is called (but on the network thread) from
- // the same function. We can now get rid of this function
- // (OnTransportChannelClosed) and run this loop from within the
- // TeardownDataChannelTransport_n callback.
+ // the same function. Once `sctp_data_channels_` moves to the network thread,
+ // we can get rid of this function (OnTransportChannelClosed) and run this
+ // loop from within the TeardownDataChannelTransport_n callback.
std::vector<rtc::scoped_refptr<SctpDataChannel>> temp_sctp_dcs;
- temp_sctp_dcs.swap(sctp_data_channels_n_);
+ temp_sctp_dcs.swap(sctp_data_channels_);
for (const auto& channel : temp_sctp_dcs) {
channel->OnTransportChannelClosed(error);
}
@@ -435,10 +444,16 @@
StreamId sid,
const SendDataParams& params,
const rtc::CopyOnWriteBuffer& payload) {
- RTC_DCHECK_RUN_ON(network_thread());
+ // TODO(bugs.webrtc.org/11547): Expect method to be called on the network
+ // thread instead. Remove the BlockingCall() below and move associated state
+ // to the network thread.
+ RTC_DCHECK_RUN_ON(signaling_thread());
RTC_DCHECK(data_channel_transport());
- return data_channel_transport()->SendData(sid.stream_id_int(), params,
- payload);
+
+ return network_thread()->BlockingCall([this, sid, params, payload] {
+ return data_channel_transport()->SendData(sid.stream_id_int(), params,
+ payload);
+ });
}
void DataChannelController::NotifyDataChannelsOfTransportCreated() {
@@ -448,8 +463,22 @@
for (const auto& channel : sctp_data_channels_n_) {
if (channel->sid_n().HasValue())
AddSctpDataStream(channel->sid_n());
- channel->OnTransportChannelCreated();
}
+
+ signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ for (const auto& channel : sctp_data_channels_) {
+ channel->OnTransportChannelCreated();
+ }
+ }));
+}
+
+std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator
+DataChannelController::FindChannel(StreamId stream_id) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ return absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+ return c->sid_s() == stream_id;
+ });
}
rtc::Thread* DataChannelController::network_thread() const {