[SctpDataChannel] Add a copy of the sid for the network thread.

* Rename id_ -> id_s_, add id_n_ and thread guards.
* Same for getters, sid() -> sid_s(), add sid_n()

As more things migrate over to the network thread, we'll only need the
_n variant.

Bug: webrtc:11547
Change-Id: Ic998330f4c81b0f6833967631ac70edc2ca2301c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/299141
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#39724}
diff --git a/pc/data_channel_controller.cc b/pc/data_channel_controller.cc
index e42c969..39a31e6 100644
--- a/pc/data_channel_controller.cc
+++ b/pc/data_channel_controller.cc
@@ -117,7 +117,7 @@
   StreamId sid(channel_id);
   sid_allocator_.ReleaseSid(sid);
   auto it = absl::c_find_if(sctp_data_channels_n_,
-                            [&](const auto& c) { return c->sid() == sid; });
+                            [&](const auto& c) { return c->sid_n() == sid; });
 
   if (it != sctp_data_channels_n_.end())
     sctp_data_channels_n_.erase(it);
@@ -343,9 +343,10 @@
     RTC_DCHECK_RUN_ON(network_thread());
     for (auto it = sctp_data_channels_n_.begin();
          it != sctp_data_channels_n_.end();) {
-      if (!(*it)->sid().HasValue()) {
+      if (!(*it)->sid_n().HasValue()) {
         StreamId sid = sid_allocator_.AllocateSid(role);
         if (sid.HasValue()) {
+          (*it)->SetSctpSid_n(sid);
           AddSctpDataStream(sid);
           channels_to_update.push_back(std::make_pair((*it).get(), sid));
         } else {
@@ -373,22 +374,20 @@
       return c.get() == pair.first;
     });
     RTC_DCHECK(it != sctp_data_channels_.end());
-    (*it)->SetSctpSid(pair.second);
+    (*it)->SetSctpSid_s(pair.second);
   }
 }
 
 void DataChannelController::OnSctpDataChannelClosed(SctpDataChannel* channel) {
   RTC_DCHECK_RUN_ON(signaling_thread());
 
-  // TODO(tommi): `sid()` should be called on the network thread.
-  // `sid()` and `SctpDataChannel::id_`should have thread guards to enforce
-  // correct usage.
-  network_thread()->BlockingCall([&, sid = channel->sid()] {
+  network_thread()->BlockingCall([&] {
     RTC_DCHECK_RUN_ON(network_thread());
     // After the closing procedure is done, it's safe to use this ID for
     // another data channel.
-    if (sid.HasValue())
-      sid_allocator_.ReleaseSid(sid);
+    if (channel->sid_n().HasValue()) {
+      sid_allocator_.ReleaseSid(channel->sid_n());
+    }
 
     auto it = absl::c_find_if(sctp_data_channels_n_, [&](const auto& c) {
       return c.get() == channel;
@@ -463,15 +462,14 @@
   RTC_DCHECK_RUN_ON(network_thread());
   RTC_DCHECK(data_channel_transport());
 
-  // TODO(tommi): Move the blocking call to `AddSctpDataStream` from
-  // `SctpDataChannel::OnTransportChannelCreated` to here and be consistent
-  // with other call sites to `AddSctpDataStream`. We're already
-  // on the right (network) thread here.
+  for (const auto& channel : sctp_data_channels_n_) {
+    if (channel->sid_n().HasValue())
+      AddSctpDataStream(channel->sid_n());
+  }
 
   signaling_thread()->PostTask(SafeTask(signaling_safety_.flag(), [this] {
     RTC_DCHECK_RUN_ON(signaling_thread());
-    auto copy = sctp_data_channels_;
-    for (const auto& channel : copy) {
+    for (const auto& channel : sctp_data_channels_) {
       channel->OnTransportChannelCreated();
     }
   }));
@@ -480,8 +478,9 @@
 std::vector<rtc::scoped_refptr<SctpDataChannel>>::iterator
 DataChannelController::FindChannel(StreamId stream_id) {
   RTC_DCHECK_RUN_ON(signaling_thread());
-  return absl::c_find_if(sctp_data_channels_,
-                         [&](const auto& c) { return c->sid() == stream_id; });
+  return absl::c_find_if(sctp_data_channels_, [&](const auto& c) {
+    return c->sid_s() == stream_id;
+  });
 }
 
 rtc::Thread* DataChannelController::network_thread() const {
diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc
index a970aaa..80a349d 100644
--- a/pc/data_channel_unittest.cc
+++ b/pc/data_channel_unittest.cc
@@ -89,7 +89,7 @@
   void SetChannelReady() {
     controller_->set_transport_available(true);
     webrtc_data_channel_->OnTransportChannelCreated();
-    if (!webrtc_data_channel_->sid().HasValue()) {
+    if (!webrtc_data_channel_->sid_s().HasValue()) {
       SetChannelSid(webrtc_data_channel_, StreamId(0));
     }
     controller_->set_ready_to_send(true);
@@ -105,7 +105,7 @@
     RTC_DCHECK(sid.HasValue());
     network_thread_.BlockingCall(
         [&]() { controller_->AddSctpDataStream(sid); });
-    channel->SetSctpSid(sid);
+    channel->SetSctpSid_s(sid);
   }
 
   void AddObserver() {
@@ -141,11 +141,11 @@
 
   // Check the non-const part of the configuration.
   EXPECT_EQ(webrtc_data_channel_->id(), init_.id);
-  EXPECT_EQ(webrtc_data_channel_->sid(), StreamId());
+  EXPECT_EQ(webrtc_data_channel_->sid_s(), StreamId());
 
   SetChannelReady();
   EXPECT_EQ(webrtc_data_channel_->id(), 0);
-  EXPECT_EQ(webrtc_data_channel_->sid(), StreamId(0));
+  EXPECT_EQ(webrtc_data_channel_->sid_s(), StreamId(0));
 }
 
 // Verifies that the data channel is connected to the transport after creation.
@@ -156,10 +156,10 @@
 
   EXPECT_TRUE(controller_->IsConnected(dc.get()));
   // The sid is not set yet, so it should not have added the streams.
-  EXPECT_FALSE(controller_->IsStreamAdded(dc->sid()));
+  EXPECT_FALSE(controller_->IsStreamAdded(dc->sid_s()));
 
   SetChannelSid(dc, StreamId(0));
-  EXPECT_TRUE(controller_->IsStreamAdded(dc->sid()));
+  EXPECT_TRUE(controller_->IsStreamAdded(dc->sid_s()));
 }
 
 // Tests the state of the data channel.
diff --git a/pc/sctp_data_channel.cc b/pc/sctp_data_channel.cc
index 825f671..623a153 100644
--- a/pc/sctp_data_channel.cc
+++ b/pc/sctp_data_channel.cc
@@ -175,7 +175,8 @@
     rtc::Thread* network_thread)
     : signaling_thread_(signaling_thread),
       network_thread_(network_thread),
-      id_(config.id),
+      id_s_(config.id),
+      id_n_(config.id),
       internal_id_(GenerateUniqueId()),
       label_(label),
       protocol_(config.protocol),
@@ -260,7 +261,8 @@
 }
 
 int SctpDataChannel::id() const {
-  return id_.stream_id_int();
+  RTC_DCHECK_RUN_ON(signaling_thread_);
+  return id_s_.stream_id_int();
 }
 
 Priority SctpDataChannel::priority() const {
@@ -333,14 +335,21 @@
   return true;
 }
 
-void SctpDataChannel::SetSctpSid(const StreamId& sid) {
+void SctpDataChannel::SetSctpSid_s(StreamId sid) {
   RTC_DCHECK_RUN_ON(signaling_thread_);
-  RTC_DCHECK(!id_.HasValue());
+  RTC_DCHECK(!id_s_.HasValue());
   RTC_DCHECK(sid.HasValue());
   RTC_DCHECK_NE(handshake_state_, kHandshakeWaitingForAck);
   RTC_DCHECK_EQ(state_, kConnecting);
 
-  id_ = sid;
+  id_s_ = sid;
+}
+
+void SctpDataChannel::SetSctpSid_n(StreamId sid) {
+  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_DCHECK(!id_n_.HasValue());
+  RTC_DCHECK(sid.HasValue());
+  id_n_ = sid;
 }
 
 void SctpDataChannel::OnClosingProcedureStartedRemotely() {
@@ -370,16 +379,8 @@
 
 void SctpDataChannel::OnTransportChannelCreated() {
   RTC_DCHECK_RUN_ON(signaling_thread_);
-  RTC_DCHECK(controller_);
 
   connected_to_transport_ = true;
-
-  if (id_.HasValue()) {
-    // TODO(bugs.webrtc.org/11547): Move this call over to DCC and do it when we
-    // get the initial notification from the transport, on the network thread.
-    network_thread_->BlockingCall(
-        [c = controller_.get(), sid = id_] { c->AddSctpDataStream(sid); });
-  }
 }
 
 void SctpDataChannel::OnTransportChannelClosed(RTCError error) {
@@ -407,18 +408,18 @@
       // Ignore it if we are not expecting an ACK message.
       RTC_LOG(LS_WARNING)
           << "DataChannel received unexpected CONTROL message, sid = "
-          << id_.stream_id_int();
+          << id_s_.stream_id_int();
       return;
     }
     if (ParseDataChannelOpenAckMessage(payload)) {
       // We can send unordered as soon as we receive the ACK message.
       handshake_state_ = kHandshakeReady;
       RTC_LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
-                       << id_.stream_id_int();
+                       << id_s_.stream_id_int();
     } else {
       RTC_LOG(LS_WARNING)
           << "DataChannel failed to parse OPEN_ACK message, sid = "
-          << id_.stream_id_int();
+          << id_s_.stream_id_int();
     }
     return;
   }
@@ -427,7 +428,7 @@
              type == DataMessageType::kText);
 
   RTC_DLOG(LS_VERBOSE) << "DataChannel received DATA message, sid = "
-                       << id_.stream_id_int();
+                       << id_s_.stream_id_int();
   // We can send unordered as soon as we receive any DATA message since the
   // remote side must have received the OPEN (and old clients do not send
   // OPEN_ACK).
@@ -514,7 +515,7 @@
 
   switch (state_) {
     case kConnecting: {
-      if (connected_to_transport_) {
+      if (connected_to_transport_ && controller_) {
         if (handshake_state_ == kHandshakeShouldSendOpen) {
           rtc::CopyOnWriteBuffer payload;
           WriteDataChannelOpenMessage(label_, protocol_, priority_, ordered_,
@@ -534,7 +535,7 @@
           DeliverQueuedReceivedData();
         }
       } else {
-        RTC_DCHECK(!id_.HasValue());
+        RTC_DCHECK(!id_s_.HasValue());
       }
       break;
     }
@@ -542,7 +543,7 @@
       break;
     }
     case kClosing: {
-      if (connected_to_transport_) {
+      if (connected_to_transport_ && controller_) {
         // Wait for all queued data to be sent before beginning the closing
         // procedure.
         if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
@@ -550,9 +551,9 @@
           // to complete; after calling RemoveSctpDataStream,
           // OnClosingProcedureComplete will end up called asynchronously
           // afterwards.
-          if (!started_closing_procedure_ && controller_ && id_.HasValue()) {
+          if (!started_closing_procedure_ && id_s_.HasValue()) {
             started_closing_procedure_ = true;
-            network_thread_->BlockingCall([c = controller_.get(), sid = id_] {
+            network_thread_->BlockingCall([c = controller_.get(), sid = id_s_] {
               c->RemoveSctpDataStream(sid);
             });
           }
@@ -640,7 +641,7 @@
   send_params.type =
       buffer.binary ? DataMessageType::kBinary : DataMessageType::kText;
 
-  RTCError error = controller_->SendData(id_, send_params, buffer.data);
+  RTCError error = controller_->SendData(id_s_, send_params, buffer.data);
 
   if (error.ok()) {
     ++messages_sent_;
@@ -691,20 +692,12 @@
   }
 }
 
-void SctpDataChannel::QueueControlMessage(
-    const rtc::CopyOnWriteBuffer& buffer) {
-  RTC_DCHECK_RUN_ON(signaling_thread_);
-  queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
-}
-
 bool SctpDataChannel::SendControlMessage(const rtc::CopyOnWriteBuffer& buffer) {
   RTC_DCHECK_RUN_ON(signaling_thread_);
   RTC_DCHECK(connected_to_transport_);
-  RTC_DCHECK(id_.HasValue());
+  RTC_DCHECK(id_s_.HasValue());
+  RTC_DCHECK(controller_);
 
-  if (!controller_) {
-    return false;
-  }
   bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
   RTC_DCHECK(!is_open_message || !negotiated_);
 
@@ -715,10 +708,10 @@
   send_params.ordered = ordered_ || is_open_message;
   send_params.type = DataMessageType::kControl;
 
-  RTCError err = controller_->SendData(id_, send_params, buffer);
+  RTCError err = controller_->SendData(id_s_, send_params, buffer);
   if (err.ok()) {
     RTC_DLOG(LS_VERBOSE) << "Sent CONTROL message on channel "
-                         << id_.stream_id_int();
+                         << id_s_.stream_id_int();
 
     if (handshake_state_ == kHandshakeShouldSendAck) {
       handshake_state_ = kHandshakeReady;
@@ -726,7 +719,7 @@
       handshake_state_ = kHandshakeWaitingForAck;
     }
   } else if (err.type() == RTCErrorType::RESOURCE_EXHAUSTED) {
-    QueueControlMessage(buffer);
+    queued_control_data_.PushBack(std::make_unique<DataBuffer>(buffer, true));
   } else {
     RTC_LOG(LS_ERROR) << "Closing the DataChannel due to a failure to send"
                          " the CONTROL message, send_result = "
diff --git a/pc/sctp_data_channel.h b/pc/sctp_data_channel.h
index f87a3c0..588b0cb 100644
--- a/pc/sctp_data_channel.h
+++ b/pc/sctp_data_channel.h
@@ -192,7 +192,8 @@
 
   // Sets the SCTP sid and adds to transport layer if not set yet. Should only
   // be called once.
-  void SetSctpSid(const StreamId& sid);
+  void SetSctpSid_s(StreamId sid);
+  void SetSctpSid_n(StreamId sid);
 
   // The remote side started the closing procedure by resetting its outgoing
   // stream (our incoming stream). Sets state to kClosing.
@@ -215,7 +216,14 @@
   // stats purposes (see also `GetStats()`).
   int internal_id() const { return internal_id_; }
 
-  const StreamId& sid() const { return id_; }
+  StreamId sid_s() const {
+    RTC_DCHECK_RUN_ON(signaling_thread_);
+    return id_s_;
+  }
+  StreamId sid_n() const {
+    RTC_DCHECK_RUN_ON(network_thread_);
+    return id_n_;
+  }
 
   // Reset the allocator for internal ID values for testing, so that
   // the internal IDs generated are predictable. Test only.
@@ -250,12 +258,12 @@
   bool QueueSendDataMessage(const DataBuffer& buffer);
 
   void SendQueuedControlMessages();
-  void QueueControlMessage(const rtc::CopyOnWriteBuffer& buffer);
   bool SendControlMessage(const rtc::CopyOnWriteBuffer& buffer);
 
   rtc::Thread* const signaling_thread_;
   rtc::Thread* const network_thread_;
-  StreamId id_;
+  StreamId id_s_ RTC_GUARDED_BY(signaling_thread_);
+  StreamId id_n_ RTC_GUARDED_BY(network_thread_);
   const int internal_id_;
   const std::string label_;
   const std::string protocol_;
diff --git a/pc/test/fake_data_channel_controller.h b/pc/test/fake_data_channel_controller.h
index 5a1ce2b..26ecc31 100644
--- a/pc/test/fake_data_channel_controller.h
+++ b/pc/test/fake_data_channel_controller.h
@@ -51,8 +51,8 @@
                   std::move(my_weak_ptr), std::string(label),
                   transport_available_, init, signaling_thread_,
                   network_thread_);
-          if (transport_available_ && channel->sid().HasValue()) {
-            AddSctpDataStream(channel->sid());
+          if (transport_available_ && channel->sid_n().HasValue()) {
+            AddSctpDataStream(channel->sid_n());
           }
           return channel;
         });
@@ -103,8 +103,9 @@
     signaling_thread_->PostTask(SafeTask(signaling_safety_.flag(), [this, sid] {
       // Unlike the real SCTP transport, act like the closing procedure finished
       // instantly.
-      auto it = absl::c_find_if(connected_channels_,
-                                [&](const auto* c) { return c->sid() == sid; });
+      auto it = absl::c_find_if(connected_channels_, [&](const auto* c) {
+        return c->sid_s() == sid;
+      });
       // This path mimics the DCC's OnChannelClosed handler since the FDCC
       // (this class) doesn't have a transport that would do that.
       if (it != connected_channels_.end())