dcsctp: Support lifecycle events in send queue

The send queue is responsible for generating lifecycle events for all
messages that are still in the queue. Because, if they are still in the
queue, that means that the last fragment of the message hasn't been sent
yet (because then it would have been in the retransmission queue
instead). And if the last fragment hasn't been sent, the send queue is
responsible for generating the
`OnLifecycleMessageExpired(/*maybe_sent=*/false)` event.

Bug: webrtc:5696
Change-Id: Icd5956d6aa0f392cae54f2a05bd20728d9f7f0a6
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/264144
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37419}
diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc
index 9e45486..b1812f0 100644
--- a/net/dcsctp/tx/rr_send_queue.cc
+++ b/net/dcsctp/tx/rr_send_queue.cc
@@ -119,12 +119,11 @@
 }
 
 void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
-                                      TimeMs expires_at,
-                                      const SendOptions& send_options) {
+                                      MessageAttributes attributes) {
   bool was_active = bytes_to_send_in_next_message() > 0;
   buffered_amount_.Increase(message.payload().size());
   parent_.total_buffered_amount_.Increase(message.payload().size());
-  items_.emplace_back(std::move(message), expires_at, send_options);
+  items_.emplace_back(std::move(message), std::move(attributes));
 
   if (!was_active) {
     scheduler_stream_->MaybeMakeActive();
@@ -146,19 +145,18 @@
     // Allocate Message ID and SSN when the first fragment is sent.
     if (!item.message_id.has_value()) {
       // Oops, this entire message has already expired. Try the next one.
-      if (item.expires_at <= now) {
-        buffered_amount_.Decrease(item.remaining_size);
-        parent_.total_buffered_amount_.Decrease(item.remaining_size);
+      if (item.attributes.expires_at <= now) {
+        HandleMessageExpired(item);
         items_.pop_front();
         continue;
       }
 
       MID& mid =
-          item.send_options.unordered ? next_unordered_mid_ : next_ordered_mid_;
+          item.attributes.unordered ? next_unordered_mid_ : next_ordered_mid_;
       item.message_id = mid;
       mid = MID(*mid + 1);
     }
-    if (!item.send_options.unordered && !item.ssn.has_value()) {
+    if (!item.attributes.unordered && !item.ssn.has_value()) {
       item.ssn = next_ssn_;
       next_ssn_ = SSN(*next_ssn_ + 1);
     }
@@ -189,16 +187,11 @@
     SendQueue::DataToSend chunk(Data(stream_id, item.ssn.value_or(SSN(0)),
                                      item.message_id.value(), fsn, ppid,
                                      std::move(payload), is_beginning, is_end,
-                                     item.send_options.unordered));
-    if (item.send_options.max_retransmissions.has_value() &&
-        *item.send_options.max_retransmissions >=
-            std::numeric_limits<MaxRetransmits::UnderlyingType>::min() &&
-        *item.send_options.max_retransmissions <=
-            std::numeric_limits<MaxRetransmits::UnderlyingType>::max()) {
-      chunk.max_retransmissions =
-          MaxRetransmits(*item.send_options.max_retransmissions);
-    }
-    chunk.expires_at = item.expires_at;
+                                     item.attributes.unordered));
+    chunk.max_retransmissions = item.attributes.max_retransmissions;
+    chunk.expires_at = item.attributes.expires_at;
+    chunk.lifecycle_id =
+        is_end ? item.attributes.lifecycle_id : LifecycleId::NotSet();
 
     if (is_end) {
       // The entire message has been sent, and its last data copied to `chunk`,
@@ -224,15 +217,28 @@
   return absl::nullopt;
 }
 
+void RRSendQueue::OutgoingStream::HandleMessageExpired(
+    OutgoingStream::Item& item) {
+  buffered_amount_.Decrease(item.remaining_size);
+  parent_.total_buffered_amount_.Decrease(item.remaining_size);
+  if (item.attributes.lifecycle_id.IsSet()) {
+    RTC_DLOG(LS_VERBOSE) << "Triggering OnLifecycleMessageExpired("
+                         << item.attributes.lifecycle_id.value() << ", false)";
+
+    parent_.callbacks_.OnLifecycleMessageExpired(item.attributes.lifecycle_id,
+                                                 /*maybe_delivered=*/false);
+    parent_.callbacks_.OnLifecycleEnd(item.attributes.lifecycle_id);
+  }
+}
+
 bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
                                           MID message_id) {
   bool result = false;
   if (!items_.empty()) {
     Item& item = items_.front();
-    if (item.send_options.unordered == unordered &&
-        item.message_id.has_value() && *item.message_id == message_id) {
-      buffered_amount_.Decrease(item.remaining_size);
-      parent_.total_buffered_amount_.Decrease(item.remaining_size);
+    if (item.attributes.unordered == unordered && item.message_id.has_value() &&
+        *item.message_id == message_id) {
+      HandleMessageExpired(item);
       items_.pop_front();
 
       // Only partially sent messages are discarded, so if a message was
@@ -277,8 +283,7 @@
   // the fragments before actually resetting the stream.
   for (auto it = items_.begin(); it != items_.end();) {
     if (it->remaining_offset == 0) {
-      buffered_amount_.Decrease(it->remaining_size);
-      parent_.total_buffered_amount_.Decrease(it->remaining_size);
+      HandleMessageExpired(*it);
       it = items_.erase(it);
     } else {
       ++it;
@@ -348,15 +353,23 @@
   RTC_DCHECK(!message.payload().empty());
   // Any limited lifetime should start counting from now - when the message
   // has been added to the queue.
-  TimeMs expires_at = TimeMs::InfiniteFuture();
-  if (send_options.lifetime.has_value()) {
-    // `expires_at` is the time when it expires. Which is slightly larger than
-    // the message's lifetime, as the message is alive during its entire
-    // lifetime (which may be zero).
-    expires_at = now + *send_options.lifetime + DurationMs(1);
-  }
+
+  // `expires_at` is the time when it expires. Which is slightly larger than the
+  // message's lifetime, as the message is alive during its entire lifetime
+  // (which may be zero).
+  MessageAttributes attributes = {
+      .unordered = send_options.unordered,
+      .max_retransmissions =
+          send_options.max_retransmissions.has_value()
+              ? MaxRetransmits(send_options.max_retransmissions.value())
+              : MaxRetransmits::NoLimit(),
+      .expires_at = send_options.lifetime.has_value()
+                        ? now + *send_options.lifetime + DurationMs(1)
+                        : TimeMs::InfiniteFuture(),
+      .lifecycle_id = send_options.lifecycle_id,
+  };
   GetOrCreateStreamInfo(message.stream_id())
-      .Add(std::move(message), expires_at, send_options);
+      .Add(std::move(message), std::move(attributes));
   RTC_DCHECK(IsConsistent());
 }
 
diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h
index 9152f27..e9b8cd2 100644
--- a/net/dcsctp/tx/rr_send_queue.h
+++ b/net/dcsctp/tx/rr_send_queue.h
@@ -45,6 +45,12 @@
 // The send queue may trigger callbacks:
 //  * `OnBufferedAmountLow`, `OnTotalBufferedAmountLow`
 //    These will be triggered as defined in their documentation.
+//  * `OnLifecycleMessageExpired(/*maybe_delivered=*/false)`, `OnLifecycleEnd`
+//    These will be triggered when messages have been expired, abandoned or
+//    discarded from the send queue. If a message is fully produced, meaning
+//    that the last fragment has been produced, the responsibility to send
+//    lifecycle events is then transferred to the retransmission queue, which
+//    is the one asking to produce the message.
 class RRSendQueue : public SendQueue {
  public:
   RRSendQueue(absl::string_view log_prefix,
@@ -96,6 +102,13 @@
   void RestoreFromState(const DcSctpSocketHandoverState& state);
 
  private:
+  struct MessageAttributes {
+    IsUnordered unordered;
+    MaxRetransmits max_retransmissions;
+    TimeMs expires_at;
+    LifecycleId lifecycle_id;
+  };
+
   // Represents a value and a "low threshold" that when the value reaches or
   // goes under the "low threshold", will trigger `on_threshold_reached`
   // callback.
@@ -139,9 +152,7 @@
     StreamID stream_id() const { return scheduler_stream_->stream_id(); }
 
     // Enqueues a message to this stream.
-    void Add(DcSctpMessage message,
-             TimeMs expires_at,
-             const SendOptions& send_options);
+    void Add(DcSctpMessage message, MessageAttributes attributes);
 
     // Implementing `StreamScheduler::StreamProducer`.
     absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
@@ -208,17 +219,13 @@
 
     // An enqueued message and metadata.
     struct Item {
-      explicit Item(DcSctpMessage msg,
-                    TimeMs expires_at,
-                    const SendOptions& send_options)
+      explicit Item(DcSctpMessage msg, MessageAttributes attributes)
           : message(std::move(msg)),
-            expires_at(expires_at),
-            send_options(send_options),
+            attributes(std::move(attributes)),
             remaining_offset(0),
             remaining_size(message.payload().size()) {}
       DcSctpMessage message;
-      TimeMs expires_at;
-      SendOptions send_options;
+      MessageAttributes attributes;
       // The remaining payload (offset and size) to be sent, when it has been
       // fragmented.
       size_t remaining_offset;
@@ -232,6 +239,7 @@
     };
 
     bool IsConsistent() const;
+    void HandleMessageExpired(OutgoingStream::Item& item);
 
     RRSendQueue& parent_;
 
diff --git a/net/dcsctp/tx/rr_send_queue_test.cc b/net/dcsctp/tx/rr_send_queue_test.cc
index 78b5ecd..95416b1 100644
--- a/net/dcsctp/tx/rr_send_queue_test.cc
+++ b/net/dcsctp/tx/rr_send_queue_test.cc
@@ -812,5 +812,55 @@
   EXPECT_FALSE(buf_.Produce(kNow, 1).has_value());
 }
 
+TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenExpiredInSendQueue) {
+  std::vector<uint8_t> payload(kOneFragmentPacketSize);
+  buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, payload),
+           SendOptions{.lifetime = DurationMs(1000),
+                       .lifecycle_id = LifecycleId(1)});
+
+  EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
+                                                    /*maybe_delivered=*/false));
+  EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
+  EXPECT_FALSE(buf_.Produce(kNow + DurationMs(1001), kOneFragmentPacketSize)
+                   .has_value());
+}
+
+TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingDuringPause) {
+  std::vector<uint8_t> payload(120);
+
+  buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
+           SendOptions{.lifecycle_id = LifecycleId(1)});
+  buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
+           SendOptions{.lifecycle_id = LifecycleId(2)});
+
+  absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
+  ASSERT_TRUE(chunk_one.has_value());
+  EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+  EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50);
+
+  EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(2),
+                                                    /*maybe_delivered=*/false));
+  EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(2)));
+  buf_.PrepareResetStream(StreamID(1));
+  EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50);
+}
+
+TEST_F(RRSendQueueTest, WillSendLifecycleExpireWhenDiscardingExplicitly) {
+  std::vector<uint8_t> payload(kOneFragmentPacketSize + 20);
+
+  buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload),
+           SendOptions{.lifecycle_id = LifecycleId(1)});
+
+  absl::optional<SendQueue::DataToSend> chunk_one =
+      buf_.Produce(kNow, kOneFragmentPacketSize);
+  ASSERT_TRUE(chunk_one.has_value());
+  EXPECT_FALSE(chunk_one->data.is_end);
+  EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
+  EXPECT_CALL(callbacks_, OnLifecycleMessageExpired(LifecycleId(1),
+                                                    /*maybe_delivered=*/false));
+  EXPECT_CALL(callbacks_, OnLifecycleEnd(LifecycleId(1)));
+  buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
+               chunk_one->data.message_id);
+}
 }  // namespace
 }  // namespace dcsctp
diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h
index a7e6635..0b96e90 100644
--- a/net/dcsctp/tx/send_queue.h
+++ b/net/dcsctp/tx/send_queue.h
@@ -34,6 +34,10 @@
     // Partial reliability - RFC3758
     MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit();
     TimeMs expires_at = TimeMs::InfiniteFuture();
+
+    // Lifecycle - set for the last fragment, and `LifecycleId::NotSet()` for
+    // all other fragments.
+    LifecycleId lifecycle_id = LifecycleId::NotSet();
   };
 
   virtual ~SendQueue() = default;