Create SctpDataChannel objects on the network thread.
* Change data channel creation code to return RTCError for more
detailed/accurate errors.
* Move DataChannelController::sid_allocator_ to the network thread.
* Add a temporary duplicate vector of channels on the network thread.
This will eventually be the main vector.
* Delete one test that turns out to be racy (as long as we're using
both the signaling and network threads).
Bug: webrtc:11547, webrtc:12796
Change-Id: I93ab721a09872d075046a907df60e8aee4263371
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/298624
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39719}
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index c8e46c7..9f86354 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -21,7 +21,19 @@
namespace webrtc {
-DataChannelController::~DataChannelController() {}
+DataChannelController::~DataChannelController() {
+#if RTC_DCHECK_IS_ON
+ // `sctp_data_channels_n_` might be empty while `sctp_data_channels_` is
+ // not. An example of that is when the `DataChannelController` goes out of
+ // scope with outstanding channels that have been properly terminated on the
+ // network thread but not yet cleared from `sctp_data_channels_`. However,
+ // if `sctp_data_channels_n_` is not empty, then `sctp_data_channels_n_` and
+ // sctp_data_channels_ should hold the same contents.
+ if (!sctp_data_channels_n_.empty()) {
+ RTC_DCHECK_EQ(sctp_data_channels_n_.size(), sctp_data_channels_.size());
+ }
+#endif
+}
bool DataChannelController::HasDataChannels() const {
RTC_DCHECK_RUN_ON(signaling_thread());
@@ -45,6 +57,7 @@
void DataChannelController::AddSctpDataStream(StreamId sid) {
RTC_DCHECK_RUN_ON(network_thread());
+ RTC_DCHECK(sid.HasValue());
if (data_channel_transport()) {
data_channel_transport()->OpenChannel(sid.stream_id_int());
}
@@ -57,11 +70,6 @@
}
}
-bool DataChannelController::ReadyToSendData() const {
- RTC_DCHECK_RUN_ON(signaling_thread());
- return (data_channel_transport() && data_channel_transport_ready_to_send_);
-}
-
void DataChannelController::OnChannelStateChanged(
SctpDataChannel* channel,
DataChannelInterface::DataState state) {
@@ -106,28 +114,33 @@
void DataChannelController::OnChannelClosed(int channel_id) {
RTC_DCHECK_RUN_ON(network_thread());
- signaling_thread()->PostTask(
- SafeTask(signaling_safety_.flag(), [this, channel_id] {
- RTC_DCHECK_RUN_ON(signaling_thread());
- auto it = FindChannel(StreamId(channel_id));
- // Remove the channel from our list, close it and free up resources.
- if (it != sctp_data_channels_.end()) {
- rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
- // Note: this causes OnSctpDataChannelClosed() to not do anything
- // when called from within `OnClosingProcedureComplete`.
- sctp_data_channels_.erase(it);
- sid_allocator_.ReleaseSid(channel->sid());
+ StreamId sid(channel_id);
+ sid_allocator_.ReleaseSid(sid);
+ auto it = absl::c_find_if(sctp_data_channels_n_,
+ [&](const auto& c) { return c->sid() == sid; });
- channel->OnClosingProcedureComplete();
- }
- }));
+ if (it != sctp_data_channels_n_.end())
+ sctp_data_channels_n_.erase(it);
+
+ signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ auto it = FindChannel(sid);
+ // Remove the channel from our list, close it and free up resources.
+ if (it != sctp_data_channels_.end()) {
+ rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
+ // Note: this causes OnSctpDataChannelClosed() to not do anything
+ // when called from within `OnClosingProcedureComplete`.
+ sctp_data_channels_.erase(it);
+
+ channel->OnClosingProcedureComplete();
+ }
+ }));
}
void DataChannelController::OnReadyToSend() {
RTC_DCHECK_RUN_ON(network_thread());
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
RTC_DCHECK_RUN_ON(signaling_thread());
- data_channel_transport_ready_to_send_ = true;
auto copy = sctp_data_channels_;
for (const auto& channel : copy)
channel->OnTransportReady();
@@ -158,6 +171,7 @@
data_channel_transport()->SetDataSink(nullptr);
}
set_data_channel_transport(nullptr);
+ sctp_data_channels_n_.clear();
}
void DataChannelController::OnTransportChanged(
@@ -220,141 +234,173 @@
void DataChannelController::OnDataChannelOpenMessage(
const std::string& label,
const InternalDataChannelInit& config) {
- rtc::scoped_refptr<DataChannelInterface> channel(
- InternalCreateDataChannelWithProxy(label, config));
- if (!channel.get()) {
- RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message.";
+ auto channel_or_error = InternalCreateDataChannelWithProxy(label, config);
+ if (!channel_or_error.ok()) {
+ RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
+ << ToString(channel_or_error.error().type());
return;
}
- pc_->Observer()->OnDataChannel(std::move(channel));
+ pc_->Observer()->OnDataChannel(channel_or_error.MoveValue());
pc_->NoteDataAddedEvent();
}
-rtc::scoped_refptr<DataChannelInterface>
+// RTC_RUN_ON(network_thread())
+RTCError DataChannelController::ReserveOrAllocateSid(
+ StreamId& sid,
+ absl::optional<rtc::SSLRole> fallback_ssl_role) {
+ if (sid.HasValue()) {
+ return sid_allocator_.ReserveSid(sid)
+ ? RTCError::OK()
+ : RTCError(RTCErrorType::INVALID_RANGE,
+ "StreamId out of range or reserved.");
+ }
+
+ // Attempt to allocate an ID based on the negotiated role.
+ absl::optional<rtc::SSLRole> role = pc_->GetSctpSslRole_n();
+ if (!role)
+ role = fallback_ssl_role;
+ if (role) {
+ sid = sid_allocator_.AllocateSid(*role);
+ if (!sid.HasValue())
+ return RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
+ }
+ // When we get here, we may still not have an ID, but that's a supported case
+ // whereby an id will be assigned later.
+ RTC_DCHECK(sid.HasValue() || !role);
+ return RTCError::OK();
+}
+
+RTCErrorOr<rtc::scoped_refptr<DataChannelInterface>>
DataChannelController::InternalCreateDataChannelWithProxy(
const std::string& label,
const InternalDataChannelInit& config) {
RTC_DCHECK_RUN_ON(signaling_thread());
- if (pc_->IsClosed()) {
- return nullptr;
- }
-
- rtc::scoped_refptr<SctpDataChannel> channel =
- InternalCreateSctpDataChannel(label, config);
- if (channel) {
- return SctpDataChannel::CreateProxy(channel);
- }
-
- return nullptr;
-}
-
-rtc::scoped_refptr<SctpDataChannel>
-DataChannelController::InternalCreateSctpDataChannel(
- const std::string& label,
- const InternalDataChannelInit& config) {
- RTC_DCHECK_RUN_ON(signaling_thread());
+ RTC_DCHECK(!pc_->IsClosed());
if (!config.IsValid()) {
- RTC_LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
- "invalid DataChannelInit.";
- return nullptr;
+ LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER,
+ "Invalid DataChannelInit");
}
+ bool ready_to_send = false;
InternalDataChannelInit new_config = config;
StreamId sid(new_config.id);
- if (!sid.HasValue()) {
- // TODO(bugs.webrtc.org/11547): Use this call to the network thread more
- // broadly to initialize the channel on the network thread, assign
- // an id and/or other things that belong on the network thread.
- // Move `sid_allocator_` to the network thread.
- absl::optional<rtc::SSLRole> role = network_thread()->BlockingCall([this] {
- RTC_DCHECK_RUN_ON(network_thread());
- return pc_->GetSctpSslRole_n();
- });
+ auto weak_ptr = weak_factory_.GetWeakPtr();
+ RTC_DCHECK(weak_ptr); // Associate with current thread.
+ auto ret = network_thread()->BlockingCall(
+ [&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> {
+ RTC_DCHECK_RUN_ON(network_thread());
+ RTCError err = ReserveOrAllocateSid(sid, new_config.fallback_ssl_role);
+ if (!err.ok())
+ return err;
- if (!role)
- role = new_config.fallback_ssl_role;
+ // In case `sid` has changed. Update `new_config` accordingly.
+ new_config.id = sid.stream_id_int();
+ ready_to_send =
+ data_channel_transport_ && data_channel_transport_->IsReadyToSend();
- if (role) {
- sid = sid_allocator_.AllocateSid(*role);
- if (!sid.HasValue())
- return nullptr;
- }
- // Note that when we get here, the ID may still be invalid.
- } else if (!sid_allocator_.ReserveSid(sid)) {
- RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel "
- "because the id is already in use or out of range.";
- return nullptr;
- }
- // In case `sid` has changed. Update `new_config` accordingly.
- new_config.id = sid.stream_id_int();
- // TODO(bugs.webrtc.org/11547): The `data_channel_transport_` pointer belongs
- // to the network thread but there are a few places where we check this
- // pointer from the signaling thread. Instead of this approach, we should have
- // a separate channel initialization step that runs on the network thread
- // where we inform the channel of information about whether there's a
- // transport or not, what the role is, and supply an id if any. Subsequently
- // all that state in the channel code, is needed for callbacks from the
- // transport which is already initiated from the network thread. Then we can
- // Remove the trampoline code (see e.g. PostTask() calls in this file) that
- // travels between the signaling and network threads.
- rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
- weak_factory_.GetWeakPtr(), label, data_channel_transport() != nullptr,
- new_config, signaling_thread(), network_thread()));
- RTC_DCHECK(channel);
+ rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
+ std::move(weak_ptr), label, data_channel_transport_ != nullptr,
+ new_config, signaling_thread(), network_thread()));
+ RTC_DCHECK(channel);
+ sctp_data_channels_n_.push_back(channel);
- if (ReadyToSendData()) {
+ // If we have an id already, notify the transport.
+ if (sid.HasValue())
+ AddSctpDataStream(sid);
+
+ return channel;
+ });
+
+ if (!ret.ok())
+ return ret.MoveError();
+
+ if (ready_to_send) {
// Checks if the transport is ready to send because the initial channel
// ready signal may have been sent before the DataChannel creation.
// This has to be done async because the upper layer objects (e.g.
// Chrome glue and WebKit) are not wired up properly until after this
// function returns.
signaling_thread()->PostTask(
- SafeTask(signaling_safety_.flag(), [channel = channel] {
+ SafeTask(signaling_safety_.flag(), [channel = ret.value()] {
if (channel->state() != DataChannelInterface::DataState::kClosed)
channel->OnTransportReady();
}));
}
- sctp_data_channels_.push_back(channel);
+ sctp_data_channels_.push_back(ret.value());
has_used_data_channels_ = true;
- return channel;
+ return SctpDataChannel::CreateProxy(ret.MoveValue());
}
void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
RTC_DCHECK_RUN_ON(signaling_thread());
+
+ std::vector<std::pair<SctpDataChannel*, StreamId>> channels_to_update;
std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
- for (const auto& channel : sctp_data_channels_) {
- if (!channel->sid().HasValue()) {
- StreamId sid = sid_allocator_.AllocateSid(role);
- if (!sid.HasValue()) {
- channels_to_close.push_back(channel);
- continue;
+
+ network_thread()->BlockingCall([&] {
+ RTC_DCHECK_RUN_ON(network_thread());
+ for (auto it = sctp_data_channels_n_.begin();
+ it != sctp_data_channels_n_.end();) {
+ if (!(*it)->sid().HasValue()) {
+ StreamId sid = sid_allocator_.AllocateSid(role);
+ if (sid.HasValue()) {
+ AddSctpDataStream(sid);
+ channels_to_update.push_back(std::make_pair((*it).get(), sid));
+ } else {
+ channels_to_close.push_back(std::move(*it));
+ it = sctp_data_channels_n_.erase(it);
+ continue;
+ }
}
- // TODO(bugs.webrtc.org/11547): This hides a blocking call to the network
- // thread via AddSctpDataStream. Maybe it's better to move the whole loop
- // to the network thread? Maybe even `sctp_data_channels_`?
- channel->SetSctpSid(sid);
+ ++it;
}
- }
+ });
+
// Since closing modifies the list of channels, we have to do the actual
// closing outside the loop.
for (const auto& channel : channels_to_close) {
channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
+ // The channel should now have been removed from sctp_data_channels_.
+ RTC_DCHECK(absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+ return c.get() == channel.get();
+ }) == sctp_data_channels_.end());
+ }
+
+ for (auto& pair : channels_to_update) {
+ auto it = absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+ return c.get() == pair.first;
+ });
+ RTC_DCHECK(it != sctp_data_channels_.end());
+ (*it)->SetSctpSid(pair.second);
}
}
void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
RTC_DCHECK_RUN_ON(signaling_thread());
+
+ // TODO(tommi): `sid()` should be called on the network thread.
+ // `sid()` and `SctpDataChannel::id_`should have thread guards to enforce
+ // correct usage.
+ network_thread()->BlockingCall([&, sid = channel->sid()] {
+ RTC_DCHECK_RUN_ON(network_thread());
+ // After the closing procedure is done, it's safe to use this ID for
+ // another data channel.
+ if (sid.HasValue())
+ sid_allocator_.ReleaseSid(sid);
+
+ auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
+ return c.get() == channel;
+ });
+
+ if (it != sctp_data_channels_n_.end())
+ sctp_data_channels_n_.erase(it);
+ });
+
for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
++it) {
if (it->get() == channel) {
- if (channel->sid().HasValue()) {
- // After the closing procedure is done, it's safe to use this ID for
- // another data channel.
- sid_allocator_.ReleaseSid(channel->sid());
- }
-
// Since this method is triggered by a signal from the DataChannel,
// we can't free it directly here; we need to free it asynchronously.
rtc::scoped_refptr<SctpDataChannel> release = std::move(*it);
@@ -416,6 +462,12 @@
void DataChannelController::NotifyDataChannelsOfTransportCreated() {
RTC_DCHECK_RUN_ON(network_thread());
RTC_DCHECK(data_channel_transport());
+
+ // TODO(tommi): Move the blocking call to `AddSctpDataStream` from
+ // `SctpDataChannel::OnTransportChannelCreated` to here and be consistent
+ // with other call sites to `AddSctpDataStream`. We're already
+ // on the right (network) thread here.
+
signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
RTC_DCHECK_RUN_ON(signaling_thread());
auto copy = sctp_data_channels_;