dcsctp: hand over RRSendQueue streams state

Bug: webrtc:13154
Change-Id: I560b59ad2f5bcd2deafc3a37e3af853108b572b2
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/232605
Commit-Queue: Sergey Sukhanov <sergeysu@webrtc.org>
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35053}
diff --git a/net/dcsctp/packet/chunk/data_chunk.cc b/net/dcsctp/packet/chunk/data_chunk.cc
index cf65f53..769be2d 100644
--- a/net/dcsctp/packet/chunk/data_chunk.cc
+++ b/net/dcsctp/packet/chunk/data_chunk.cc
@@ -93,7 +93,7 @@
              ? "complete"
              : *options().is_beginning ? "first"
                                        : *options().is_end ? "last" : "middle")
-     << ", tsn=" << *tsn() << ", stream_id=" << *stream_id()
+     << ", tsn=" << *tsn() << ", sid=" << *stream_id() << ", ssn=" << *ssn()
      << ", ppid=" << *ppid() << ", length=" << payload().size();
   return sb.Release();
 }
diff --git a/net/dcsctp/packet/chunk/data_chunk_test.cc b/net/dcsctp/packet/chunk/data_chunk_test.cc
index 6a5ca82..def99ceb 100644
--- a/net/dcsctp/packet/chunk/data_chunk_test.cc
+++ b/net/dcsctp/packet/chunk/data_chunk_test.cc
@@ -67,7 +67,7 @@
   EXPECT_THAT(chunk.payload(), ElementsAre(1, 2, 3, 4, 5));
 
   EXPECT_EQ(deserialized.ToString(),
-            "DATA, type=ordered::middle, tsn=123, stream_id=456, ppid=9090, "
+            "DATA, type=ordered::middle, tsn=123, sid=456, ssn=789, ppid=9090, "
             "length=5");
 }
 }  // namespace
diff --git a/net/dcsctp/public/dcsctp_handover_state.h b/net/dcsctp/public/dcsctp_handover_state.h
index 3ad4ab7..370501c 100644
--- a/net/dcsctp/public/dcsctp_handover_state.h
+++ b/net/dcsctp/public/dcsctp_handover_state.h
@@ -43,6 +43,12 @@
   };
   Capabilities capabilities;
 
+  struct OutgoingStream {
+    uint32_t id = 0;
+    uint32_t next_ssn = 0;
+    uint32_t next_unordered_mid = 0;
+    uint32_t next_ordered_mid = 0;
+  };
   struct Transmission {
     uint32_t next_tsn = 0;
     uint32_t next_reset_req_sn = 0;
@@ -50,6 +56,7 @@
     uint32_t rwnd = 0;
     uint32_t ssthresh = 0;
     uint32_t partial_bytes_acked = 0;
+    std::vector<OutgoingStream> streams;
   };
   Transmission tx;
 
diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc
index bfe248c..7910a39 100644
--- a/net/dcsctp/socket/dcsctp_socket.cc
+++ b/net/dcsctp/socket/dcsctp_socket.cc
@@ -302,6 +302,8 @@
           state.capabilities.message_interleaving;
       capabilities.reconfig = state.capabilities.reconfig;
 
+      send_queue_.RestoreFromState(state);
+
       tcb_ = std::make_unique<TransmissionControlBlock>(
           timer_manager_, log_prefix_, options_, capabilities, callbacks_,
           send_queue_, my_verification_tag, TSN(state.my_initial_tsn),
@@ -1619,9 +1621,7 @@
   if (state_ != State::kClosed && state_ != State::kEstablished) {
     status.Add(HandoverUnreadinessReason::kWrongConnectionState);
   }
-  if (!send_queue_.IsEmpty()) {
-    status.Add(HandoverUnreadinessReason::kSendQueueNotEmpty);
-  }
+  status.Add(send_queue_.GetHandoverReadiness());
   if (tcb_) {
     status.Add(tcb_->GetHandoverReadiness());
   }
@@ -1641,6 +1641,7 @@
   } else if (state_ == State::kEstablished) {
     state.socket_state = DcSctpSocketHandoverState::SocketState::kConnected;
     tcb_->AddHandoverState(state);
+    send_queue_.AddHandoverState(state);
     InternalClose(ErrorKind::kNoError, "handover");
     callbacks_.TriggerDeferred();
   }
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 254214e..eaaf34a 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -27,6 +27,19 @@
 
 namespace dcsctp {
 
+RRSendQueue::RRSendQueue(absl::string_view log_prefix,
+                         size_t buffer_size,
+                         std::function<void(StreamID)> on_buffered_amount_low,
+                         size_t total_buffered_amount_low_threshold,
+                         std::function<void()> on_total_buffered_amount_low,
+                         const DcSctpSocketHandoverState* handover_state)
+    : log_prefix_(std::string(log_prefix) + "fcfs: "),
+      buffer_size_(buffer_size),
+      on_buffered_amount_low_(std::move(on_buffered_amount_low)),
+      total_buffered_amount_(std::move(on_total_buffered_amount_low)) {
+  total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
+}
+
 bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) {
   while (!items_.empty()) {
     RRSendQueue::OutgoingStream::Item& item = items_.front();
@@ -53,6 +66,13 @@
   return false;
 }
 
+void RRSendQueue::OutgoingStream::AddHandoverState(
+    DcSctpSocketHandoverState::OutgoingStream& state) const {
+  state.next_ssn = next_ssn_.value();
+  state.next_ordered_mid = next_ordered_mid_.value();
+  state.next_unordered_mid = next_unordered_mid_.value();
+}
+
 bool RRSendQueue::IsConsistent() const {
   size_t total_buffered_amount = 0;
   for (const auto& stream_entry : streams_) {
@@ -433,4 +453,33 @@
                    total_buffered_amount_))
       .first->second;
 }
+
+HandoverReadinessStatus RRSendQueue::GetHandoverReadiness() const {
+  HandoverReadinessStatus status;
+  if (!IsEmpty()) {
+    status.Add(HandoverUnreadinessReason::kSendQueueNotEmpty);
+  }
+  return status;
+}
+
+void RRSendQueue::AddHandoverState(DcSctpSocketHandoverState& state) {
+  for (const auto& entry : streams_) {
+    DcSctpSocketHandoverState::OutgoingStream state_stream;
+    state_stream.id = entry.first.value();
+    entry.second.AddHandoverState(state_stream);
+    state.tx.streams.push_back(std::move(state_stream));
+  }
+}
+
+void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
+  for (const DcSctpSocketHandoverState::OutgoingStream& state_stream :
+       state.tx.streams) {
+    StreamID stream_id(state_stream.id);
+    streams_.emplace(stream_id, OutgoingStream(
+                                    [this, stream_id]() {
+                                      on_buffered_amount_low_(stream_id);
+                                    },
+                                    total_buffered_amount_, &state_stream));
+  }
+}
 }  // namespace dcsctp
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index ed077cd..94b80d6 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -47,13 +47,8 @@
               size_t buffer_size,
               std::function<void(StreamID)> on_buffered_amount_low,
               size_t total_buffered_amount_low_threshold,
-              std::function<void()> on_total_buffered_amount_low)
-      : log_prefix_(std::string(log_prefix) + "fcfs: "),
-        buffer_size_(buffer_size),
-        on_buffered_amount_low_(std::move(on_buffered_amount_low)),
-        total_buffered_amount_(std::move(on_total_buffered_amount_low)) {
-    total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
-  }
+              std::function<void()> on_total_buffered_amount_low,
+              const DcSctpSocketHandoverState* handover_state = nullptr);
 
   // Indicates if the buffer is full. Note that it's up to the caller to ensure
   // that the buffer is not full prior to adding new items to it.
@@ -86,6 +81,10 @@
   size_t buffered_amount_low_threshold(StreamID stream_id) const override;
   void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
 
+  HandoverReadinessStatus GetHandoverReadiness() const;
+  void AddHandoverState(DcSctpSocketHandoverState& state);
+  void RestoreFromState(const DcSctpSocketHandoverState& state);
+
  private:
   // Represents a value and a "low threshold" that when the value reaches or
   // goes under the "low threshold", will trigger `on_threshold_reached`
@@ -113,9 +112,14 @@
   // Per-stream information.
   class OutgoingStream {
    public:
-    explicit OutgoingStream(std::function<void()> on_buffered_amount_low,
-                            ThresholdWatcher& total_buffered_amount)
-        : buffered_amount_(std::move(on_buffered_amount_low)),
+    explicit OutgoingStream(
+        std::function<void()> on_buffered_amount_low,
+        ThresholdWatcher& total_buffered_amount,
+        const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
+        : next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
+          next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
+          next_ssn_(SSN(state ? state->next_ssn : 0)),
+          buffered_amount_(std::move(on_buffered_amount_low)),
           total_buffered_amount_(total_buffered_amount) {}
 
     // Enqueues a message to this stream.
@@ -150,6 +154,9 @@
     // expired non-partially sent message.
     bool HasDataToSend(TimeMs now);
 
+    void AddHandoverState(
+        DcSctpSocketHandoverState::OutgoingStream& state) const;
+
    private:
     // An enqueued message and metadata.
     struct Item {
@@ -181,10 +188,10 @@
     // Streams are pause when they are about to be reset.
     bool is_paused_ = false;
     // MIDs are different for unordered and ordered messages sent on a stream.
-    MID next_unordered_mid_ = MID(0);
-    MID next_ordered_mid_ = MID(0);
+    MID next_unordered_mid_;
+    MID next_ordered_mid_;
 
-    SSN next_ssn_ = SSN(0);
+    SSN next_ssn_;
     // Enqueued messages, and metadata.
     std::deque<Item> items_;