Create SctpDataChannel objects on the network thread.

* Change data channel creation code to return RTCError for more
  detailed/accurate errors.
* Move DataChannelController::sid_allocator_ to the network thread.
* Add a temporary duplicate vector of channels on the network thread.
  This will eventually be the main vector.
* Delete one test that turns out to be racy (as long as we're using
  both the signaling and network threads).

Bug: webrtc:11547, webrtc:12796
Change-Id: I93ab721a09872d075046a907df60e8aee4263371
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/298624
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39719}
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index c8e46c7..9f86354 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -21,7 +21,19 @@
 
 namespace webrtc {
 
-DataChannelController::~DataChannelController() {}
+DataChannelController::~DataChannelController() {
+#if RTC_DCHECK_IS_ON
+  // `sctp_data_channels_n_` might be empty while `sctp_data_channels_` is
+  // not. An example of that is when the `DataChannelController` goes out of
+  // scope with outstanding channels that have been properly terminated on the
+  // network thread but not yet cleared from `sctp_data_channels_`. However,
+  // if `sctp_data_channels_n_` is not empty, then `sctp_data_channels_n_` and
+  // sctp_data_channels_ should hold the same contents.
+  if (!sctp_data_channels_n_.empty()) {
+    RTC_DCHECK_EQ(sctp_data_channels_n_.size(), sctp_data_channels_.size());
+  }
+#endif
+}
 
 bool DataChannelController::HasDataChannels() const {
   RTC_DCHECK_RUN_ON(signaling_thread());
@@ -45,6 +57,7 @@
 
 void DataChannelController::AddSctpDataStream(StreamId sid) {
   RTC_DCHECK_RUN_ON(network_thread());
+  RTC_DCHECK(sid.HasValue());
   if (data_channel_transport()) {
     data_channel_transport()->OpenChannel(sid.stream_id_int());
   }
@@ -57,11 +70,6 @@
   }
 }
 
-bool DataChannelController::ReadyToSendData() const {
-  RTC_DCHECK_RUN_ON(signaling_thread());
-  return (data_channel_transport() && data_channel_transport_ready_to_send_);
-}
-
 void DataChannelController::OnChannelStateChanged(
     SctpDataChannel* channel,
     DataChannelInterface::DataState state) {
@@ -106,28 +114,33 @@
 
 void DataChannelController::OnChannelClosed(int channel_id) {
   RTC_DCHECK_RUN_ON(network_thread());
-  signaling_thread()->PostTask(
-      SafeTask(signaling_safety_.flag(), [this, channel_id] {
-        RTC_DCHECK_RUN_ON(signaling_thread());
-        auto it = FindChannel(StreamId(channel_id));
-        // Remove the channel from our list, close it and free up resources.
-        if (it != sctp_data_channels_.end()) {
-          rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
-          // Note: this causes OnSctpDataChannelClosed() to not do anything
-          // when called from within `OnClosingProcedureComplete`.
-          sctp_data_channels_.erase(it);
-          sid_allocator_.ReleaseSid(channel->sid());
+  StreamId sid(channel_id);
+  sid_allocator_.ReleaseSid(sid);
+  auto it = absl::c_find_if(sctp_data_channels_n_,
+                            [&](const auto& c) { return c->sid() == sid; });
 
-          channel->OnClosingProcedureComplete();
-        }
-      }));
+  if (it != sctp_data_channels_n_.end())
+    sctp_data_channels_n_.erase(it);
+
+  signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
+    RTC_DCHECK_RUN_ON(signaling_thread());
+    auto it = FindChannel(sid);
+    // Remove the channel from our list, close it and free up resources.
+    if (it != sctp_data_channels_.end()) {
+      rtc::scoped_refptr<SctpDataChannel> channel = std::move(*it);
+      // Note: this causes OnSctpDataChannelClosed() to not do anything
+      // when called from within `OnClosingProcedureComplete`.
+      sctp_data_channels_.erase(it);
+
+      channel->OnClosingProcedureComplete();
+    }
+  }));
 }
 
 void DataChannelController::OnReadyToSend() {
   RTC_DCHECK_RUN_ON(network_thread());
   signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
     RTC_DCHECK_RUN_ON(signaling_thread());
-    data_channel_transport_ready_to_send_ = true;
     auto copy = sctp_data_channels_;
     for (const auto& channel : copy)
       channel->OnTransportReady();
@@ -158,6 +171,7 @@
     data_channel_transport()->SetDataSink(nullptr);
   }
   set_data_channel_transport(nullptr);
+  sctp_data_channels_n_.clear();
 }
 
 void DataChannelController::OnTransportChanged(
@@ -220,141 +234,173 @@
 void DataChannelController::OnDataChannelOpenMessage(
     const std::string& label,
     const InternalDataChannelInit& config) {
-  rtc::scoped_refptr<DataChannelInterface> channel(
-      InternalCreateDataChannelWithProxy(label, config));
-  if (!channel.get()) {
-    RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message.";
+  auto channel_or_error = InternalCreateDataChannelWithProxy(label, config);
+  if (!channel_or_error.ok()) {
+    RTC_LOG(LS_ERROR) << "Failed to create DataChannel from the OPEN message."
+                      << ToString(channel_or_error.error().type());
     return;
   }
 
-  pc_->Observer()->OnDataChannel(std::move(channel));
+  pc_->Observer()->OnDataChannel(channel_or_error.MoveValue());
   pc_->NoteDataAddedEvent();
 }
 
-rtc::scoped_refptr<DataChannelInterface>
+// RTC_RUN_ON(network_thread())
+RTCError DataChannelController::ReserveOrAllocateSid(
+    StreamId& sid,
+    absl::optional<rtc::SSLRole> fallback_ssl_role) {
+  if (sid.HasValue()) {
+    return sid_allocator_.ReserveSid(sid)
+               ? RTCError::OK()
+               : RTCError(RTCErrorType::INVALID_RANGE,
+                          "StreamId out of range or reserved.");
+  }
+
+  // Attempt to allocate an ID based on the negotiated role.
+  absl::optional<rtc::SSLRole> role = pc_->GetSctpSslRole_n();
+  if (!role)
+    role = fallback_ssl_role;
+  if (role) {
+    sid = sid_allocator_.AllocateSid(*role);
+    if (!sid.HasValue())
+      return RTCError(RTCErrorType::RESOURCE_EXHAUSTED);
+  }
+  // When we get here, we may still not have an ID, but that's a supported case
+  // whereby an id will be assigned later.
+  RTC_DCHECK(sid.HasValue() || !role);
+  return RTCError::OK();
+}
+
+RTCErrorOr<rtc::scoped_refptr<DataChannelInterface>>
 DataChannelController::InternalCreateDataChannelWithProxy(
     const std::string& label,
     const InternalDataChannelInit& config) {
   RTC_DCHECK_RUN_ON(signaling_thread());
-  if (pc_->IsClosed()) {
-    return nullptr;
-  }
-
-  rtc::scoped_refptr<SctpDataChannel> channel =
-      InternalCreateSctpDataChannel(label, config);
-  if (channel) {
-    return SctpDataChannel::CreateProxy(channel);
-  }
-
-  return nullptr;
-}
-
-rtc::scoped_refptr<SctpDataChannel>
-DataChannelController::InternalCreateSctpDataChannel(
-    const std::string& label,
-    const InternalDataChannelInit& config) {
-  RTC_DCHECK_RUN_ON(signaling_thread());
+  RTC_DCHECK(!pc_->IsClosed());
   if (!config.IsValid()) {
-    RTC_LOG(LS_ERROR) << "Failed to initialize the SCTP data channel due to "
-                         "invalid DataChannelInit.";
-    return nullptr;
+    LOG_AND_RETURN_ERROR(RTCErrorType::INVALID_PARAMETER,
+                         "Invalid DataChannelInit");
   }
 
+  bool ready_to_send = false;
   InternalDataChannelInit new_config = config;
   StreamId sid(new_config.id);
-  if (!sid.HasValue()) {
-    // TODO(bugs.webrtc.org/11547): Use this call to the network thread more
-    // broadly to initialize the channel on the network thread, assign
-    // an id and/or other things that belong on the network thread.
-    // Move `sid_allocator_` to the network thread.
-    absl::optional<rtc::SSLRole> role = network_thread()->BlockingCall([this] {
-      RTC_DCHECK_RUN_ON(network_thread());
-      return pc_->GetSctpSslRole_n();
-    });
+  auto weak_ptr = weak_factory_.GetWeakPtr();
+  RTC_DCHECK(weak_ptr);  // Associate with current thread.
+  auto ret = network_thread()->BlockingCall(
+      [&]() -> RTCErrorOr<rtc::scoped_refptr<SctpDataChannel>> {
+        RTC_DCHECK_RUN_ON(network_thread());
+        RTCError err = ReserveOrAllocateSid(sid, new_config.fallback_ssl_role);
+        if (!err.ok())
+          return err;
 
-    if (!role)
-      role = new_config.fallback_ssl_role;
+        // In case `sid` has changed. Update `new_config` accordingly.
+        new_config.id = sid.stream_id_int();
+        ready_to_send =
+            data_channel_transport_ && data_channel_transport_->IsReadyToSend();
 
-    if (role) {
-      sid = sid_allocator_.AllocateSid(*role);
-      if (!sid.HasValue())
-        return nullptr;
-    }
-    // Note that when we get here, the ID may still be invalid.
-  } else if (!sid_allocator_.ReserveSid(sid)) {
-    RTC_LOG(LS_ERROR) << "Failed to create a SCTP data channel "
-                         "because the id is already in use or out of range.";
-    return nullptr;
-  }
-  // In case `sid` has changed. Update `new_config` accordingly.
-  new_config.id = sid.stream_id_int();
-  // TODO(bugs.webrtc.org/11547): The `data_channel_transport_` pointer belongs
-  // to the network thread but there are a few places where we check this
-  // pointer from the signaling thread. Instead of this approach, we should have
-  // a separate channel initialization step that runs on the network thread
-  // where we inform the channel of information about whether there's a
-  // transport or not, what the role is, and supply an id if any. Subsequently
-  // all that state in the channel code, is needed for callbacks from the
-  // transport which is already initiated from the network thread. Then we can
-  // Remove the trampoline code (see e.g. PostTask() calls in this file) that
-  // travels between the signaling and network threads.
-  rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
-      weak_factory_.GetWeakPtr(), label, data_channel_transport() != nullptr,
-      new_config, signaling_thread(), network_thread()));
-  RTC_DCHECK(channel);
+        rtc::scoped_refptr<SctpDataChannel> channel(SctpDataChannel::Create(
+            std::move(weak_ptr), label, data_channel_transport_ != nullptr,
+            new_config, signaling_thread(), network_thread()));
+        RTC_DCHECK(channel);
+        sctp_data_channels_n_.push_back(channel);
 
-  if (ReadyToSendData()) {
+        // If we have an id already, notify the transport.
+        if (sid.HasValue())
+          AddSctpDataStream(sid);
+
+        return channel;
+      });
+
+  if (!ret.ok())
+    return ret.MoveError();
+
+  if (ready_to_send) {
     // Checks if the transport is ready to send because the initial channel
     // ready signal may have been sent before the DataChannel creation.
     // This has to be done async because the upper layer objects (e.g.
     // Chrome glue and WebKit) are not wired up properly until after this
     // function returns.
     signaling_thread()->PostTask(
-        SafeTask(signaling_safety_.flag(), [channel = channel] {
+        SafeTask(signaling_safety_.flag(), [channel = ret.value()] {
           if (channel->state() != DataChannelInterface::DataState::kClosed)
             channel->OnTransportReady();
         }));
   }
 
-  sctp_data_channels_.push_back(channel);
+  sctp_data_channels_.push_back(ret.value());
   has_used_data_channels_ = true;
-  return channel;
+  return SctpDataChannel::CreateProxy(ret.MoveValue());
 }
 
 void DataChannelController::AllocateSctpSids(rtc::SSLRole role) {
   RTC_DCHECK_RUN_ON(signaling_thread());
+
+  std::vector<std::pair<SctpDataChannel*, StreamId>> channels_to_update;
   std::vector<rtc::scoped_refptr<SctpDataChannel>> channels_to_close;
-  for (const auto& channel : sctp_data_channels_) {
-    if (!channel->sid().HasValue()) {
-      StreamId sid = sid_allocator_.AllocateSid(role);
-      if (!sid.HasValue()) {
-        channels_to_close.push_back(channel);
-        continue;
+
+  network_thread()->BlockingCall([&] {
+    RTC_DCHECK_RUN_ON(network_thread());
+    for (auto it = sctp_data_channels_n_.begin();
+         it != sctp_data_channels_n_.end();) {
+      if (!(*it)->sid().HasValue()) {
+        StreamId sid = sid_allocator_.AllocateSid(role);
+        if (sid.HasValue()) {
+          AddSctpDataStream(sid);
+          channels_to_update.push_back(std::make_pair((*it).get(), sid));
+        } else {
+          channels_to_close.push_back(std::move(*it));
+          it = sctp_data_channels_n_.erase(it);
+          continue;
+        }
       }
-      // TODO(bugs.webrtc.org/11547): This hides a blocking call to the network
-      // thread via AddSctpDataStream. Maybe it's better to move the whole loop
-      // to the network thread? Maybe even `sctp_data_channels_`?
-      channel->SetSctpSid(sid);
+      ++it;
     }
-  }
+  });
+
   // Since closing modifies the list of channels, we have to do the actual
   // closing outside the loop.
   for (const auto& channel : channels_to_close) {
     channel->CloseAbruptlyWithDataChannelFailure("Failed to allocate SCTP SID");
+    // The channel should now have been removed from sctp_data_channels_.
+    RTC_DCHECK(absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+                 return c.get() == channel.get();
+               }) == sctp_data_channels_.end());
+  }
+
+  for (auto& pair : channels_to_update) {
+    auto it = absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+      return c.get() == pair.first;
+    });
+    RTC_DCHECK(it != sctp_data_channels_.end());
+    (*it)->SetSctpSid(pair.second);
   }
 }
 
 void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
   RTC_DCHECK_RUN_ON(signaling_thread());
+
+  // TODO(tommi): `sid()` should be called on the network thread.
+  // `sid()` and `SctpDataChannel::id_`should have thread guards to enforce
+  // correct usage.
+  network_thread()->BlockingCall([&, sid = channel->sid()] {
+    RTC_DCHECK_RUN_ON(network_thread());
+    // After the closing procedure is done, it's safe to use this ID for
+    // another data channel.
+    if (sid.HasValue())
+      sid_allocator_.ReleaseSid(sid);
+
+    auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
+      return c.get() == channel;
+    });
+
+    if (it != sctp_data_channels_n_.end())
+      sctp_data_channels_n_.erase(it);
+  });
+
   for (auto it = sctp_data_channels_.begin(); it != sctp_data_channels_.end();
        ++it) {
     if (it->get() == channel) {
-      if (channel->sid().HasValue()) {
-        // After the closing procedure is done, it's safe to use this ID for
-        // another data channel.
-        sid_allocator_.ReleaseSid(channel->sid());
-      }
-
       // Since this method is triggered by a signal from the DataChannel,
       // we can't free it directly here; we need to free it asynchronously.
       rtc::scoped_refptr<SctpDataChannel> release = std::move(*it);
@@ -416,6 +462,12 @@
 void DataChannelController::NotifyDataChannelsOfTransportCreated() {
   RTC_DCHECK_RUN_ON(network_thread());
   RTC_DCHECK(data_channel_transport());
+
+  // TODO(tommi): Move the blocking call to `AddSctpDataStream` from
+  // `SctpDataChannel::OnTransportChannelCreated` to here and be consistent
+  // with other call sites to `AddSctpDataStream`. We're already
+  // on the right (network) thread here.
+
   signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
     RTC_DCHECK_RUN_ON(signaling_thread());
     auto copy = sctp_data_channels_;