dcsctp: Add metrics support

To support implementing RTCSctpTransportStats, a few metrics are needed.

Some more were added that are useful for metric collection in SFUs.

Bug: webrtc:13052
Change-Id: Idafd49e1084922d01d3e6c5860715f63aea08b7d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/228243
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34708}
diff --git a/net/dcsctp/public/dcsctp_socket.h b/net/dcsctp/public/dcsctp_socket.h
index f07f54e..dde0cb6 100644
--- a/net/dcsctp/public/dcsctp_socket.h
+++ b/net/dcsctp/public/dcsctp_socket.h
@@ -155,6 +155,43 @@
   }
 }
 
+// Tracked metrics, which is the return value of GetMetrics. Optional members
+// will be unset when they are not yet known.
+struct Metrics {
+  // Transmission stats and metrics.
+
+  // Number of packets sent.
+  size_t tx_packets_count = 0;
+
+  // Number of messages requested to be sent.
+  size_t tx_messages_count = 0;
+
+  // The current congestion window (cwnd) in bytes, corresponding to spinfo_cwnd
+  // defined in RFC6458.
+  absl::optional<size_t> cwnd_bytes = absl::nullopt;
+
+  // Smoothed round trip time, corresponding to spinfo_srtt defined in RFC6458.
+  absl::optional<int> srtt_ms = absl::nullopt;
+
+  // Number of data items in the retransmission queue that haven’t been
+  // acked/nacked yet and are in-flight. Corresponding to sstat_unackdata
+  // defined in RFC6458. This may be an approximation when there are messages in
+  // the send queue that haven't been fragmented/packetized yet.
+  size_t unack_data_count = 0;
+
+  // Receive stats and metrics.
+
+  // Number of packets received.
+  size_t rx_packets_count = 0;
+
+  // Number of messages received.
+  size_t rx_messages_count = 0;
+
+  // The peer’s last announced receiver window size, corresponding to
+  // sstat_rwnd defined in RFC6458.
+  absl::optional<uint32_t> peer_rwnd_bytes = absl::nullopt;
+};
+
 // Callbacks that the DcSctpSocket will be done synchronously to the owning
 // client. It is allowed to call back into the library from callbacks that start
 // with "On". It has been explicitly documented when it's not allowed to call
@@ -350,6 +387,9 @@
   // OnBufferedAmountLow event. The default value is zero (0).
   virtual void SetBufferedAmountLowThreshold(StreamID stream_id,
                                              size_t bytes) = 0;
+
+  // Retrieves the latest metrics.
+  virtual Metrics GetMetrics() const = 0;
 };
 }  // namespace dcsctp
 
diff --git a/net/dcsctp/public/mock_dcsctp_socket.h b/net/dcsctp/public/mock_dcsctp_socket.h
index 1814064..b382773 100644
--- a/net/dcsctp/public/mock_dcsctp_socket.h
+++ b/net/dcsctp/public/mock_dcsctp_socket.h
@@ -57,6 +57,8 @@
               SetBufferedAmountLowThreshold,
               (StreamID stream_id, size_t bytes),
               (override));
+
+  MOCK_METHOD(Metrics, GetMetrics, (), (const, override));
 };
 
 }  // namespace dcsctp
diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc
index 71bc98c..e2b04af 100644
--- a/net/dcsctp/socket/dcsctp_socket.cc
+++ b/net/dcsctp/socket/dcsctp_socket.cc
@@ -381,6 +381,7 @@
   }
 
   TimeMs now = callbacks_.TimeMillis();
+  ++metrics_.tx_messages_count;
   send_queue_.Add(now, std::move(message), send_options);
   if (tcb_ != nullptr) {
     tcb_->SendBufferedPackets(now);
@@ -456,6 +457,26 @@
   send_queue_.SetBufferedAmountLowThreshold(stream_id, bytes);
 }
 
+Metrics DcSctpSocket::GetMetrics() const {
+  Metrics metrics = metrics_;
+
+  if (tcb_ != nullptr) {
+    // Update the metrics with some stats that are extracted from
+    // sub-components.
+    metrics.cwnd_bytes = tcb_->cwnd();
+    metrics.srtt_ms = tcb_->current_srtt().value();
+    size_t packet_payload_size =
+        options_.mtu - SctpPacket::kHeaderSize - DataChunk::kHeaderSize;
+    metrics.unack_data_count =
+        tcb_->retransmission_queue().outstanding_items() +
+        (send_queue_.total_buffered_amount() + packet_payload_size - 1) /
+            packet_payload_size;
+    metrics.peer_rwnd_bytes = tcb_->retransmission_queue().rwnd();
+  }
+
+  return metrics;
+}
+
 void DcSctpSocket::MaybeSendShutdownOnPacketReceived(const SctpPacket& packet) {
   if (state_ == State::kShutdownSent) {
     bool has_data_chunk =
@@ -588,6 +609,8 @@
 }
 
 void DcSctpSocket::ReceivePacket(rtc::ArrayView<const uint8_t> data) {
+  ++metrics_.rx_packets_count;
+
   if (packet_observer_ != nullptr) {
     packet_observer_->OnReceivedPacket(callbacks_.TimeMillis(), data);
   }
@@ -834,6 +857,7 @@
   if (packet_observer_ != nullptr) {
     packet_observer_->OnSentPacket(callbacks_.TimeMillis(), payload);
   }
+  ++metrics_.tx_packets_count;
   callbacks_.SendPacket(payload);
 }
 
@@ -1267,6 +1291,7 @@
 void DcSctpSocket::DeliverReassembledMessages() {
   if (tcb_->reassembly_queue().HasMessages()) {
     for (auto& message : tcb_->reassembly_queue().FlushMessages()) {
+      ++metrics_.rx_messages_count;
       callbacks_.OnMessageReceived(std::move(message));
     }
   }
diff --git a/net/dcsctp/socket/dcsctp_socket.h b/net/dcsctp/socket/dcsctp_socket.h
index 32e89b5..e0aae38 100644
--- a/net/dcsctp/socket/dcsctp_socket.h
+++ b/net/dcsctp/socket/dcsctp_socket.h
@@ -96,6 +96,8 @@
   size_t buffered_amount(StreamID stream_id) const override;
   size_t buffered_amount_low_threshold(StreamID stream_id) const override;
   void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
+  Metrics GetMetrics() const override;
+
   // Returns this socket's verification tag, or zero if not yet connected.
   VerificationTag verification_tag() const {
     return tcb_ != nullptr ? tcb_->my_verification_tag() : VerificationTag(0);
@@ -245,6 +247,7 @@
 
   const std::string log_prefix_;
   const std::unique_ptr<PacketObserver> packet_observer_;
+  Metrics metrics_;
   DcSctpOptions options_;
 
   // Enqueues callbacks and dispatches them just before returning to the caller.
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc
index 7ca3d9b..88138f6 100644
--- a/net/dcsctp/socket/dcsctp_socket_test.cc
+++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -1608,5 +1608,110 @@
   ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
 }
 
+TEST_F(DcSctpSocketTest, InitialMetricsAreZeroed) {
+  Metrics metrics = sock_a_.GetMetrics();
+  EXPECT_EQ(metrics.tx_packets_count, 0u);
+  EXPECT_EQ(metrics.tx_messages_count, 0u);
+  EXPECT_EQ(metrics.cwnd_bytes.has_value(), false);
+  EXPECT_EQ(metrics.srtt_ms.has_value(), false);
+  EXPECT_EQ(metrics.unack_data_count, 0u);
+  EXPECT_EQ(metrics.rx_packets_count, 0u);
+  EXPECT_EQ(metrics.rx_messages_count, 0u);
+  EXPECT_EQ(metrics.peer_rwnd_bytes.has_value(), false);
+}
+
+TEST_F(DcSctpSocketTest, RxAndTxPacketMetricsIncrease) {
+  ConnectSockets();
+
+  const size_t initial_a_rwnd = options_.max_receiver_window_buffer_size *
+                                ReassemblyQueue::kHighWatermarkLimit;
+
+  EXPECT_EQ(sock_a_.GetMetrics().tx_packets_count, 2u);
+  EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 2u);
+  EXPECT_EQ(sock_a_.GetMetrics().tx_messages_count, 0u);
+  EXPECT_EQ(*sock_a_.GetMetrics().cwnd_bytes,
+            options_.cwnd_mtus_initial * options_.mtu);
+  EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 0u);
+
+  EXPECT_EQ(sock_z_.GetMetrics().rx_packets_count, 2u);
+  EXPECT_EQ(sock_z_.GetMetrics().rx_messages_count, 0u);
+
+  sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), kSendOptions);
+  EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 1u);
+
+  sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());  // DATA
+  sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());  // SACK
+  EXPECT_EQ(*sock_a_.GetMetrics().peer_rwnd_bytes, initial_a_rwnd);
+  EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 0u);
+
+  EXPECT_TRUE(cb_z_.ConsumeReceivedMessage().has_value());
+
+  EXPECT_EQ(sock_a_.GetMetrics().tx_packets_count, 3u);
+  EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 3u);
+  EXPECT_EQ(sock_a_.GetMetrics().tx_messages_count, 1u);
+
+  EXPECT_EQ(sock_z_.GetMetrics().rx_packets_count, 3u);
+  EXPECT_EQ(sock_z_.GetMetrics().rx_messages_count, 1u);
+
+  // Send one more (large - fragmented), and receive the delayed SACK.
+  sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
+                             std::vector<uint8_t>(options_.mtu * 2 + 1)),
+               kSendOptions);
+  EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 3u);
+
+  sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());  // DATA
+  sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());  // DATA
+
+  sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());  // SACK
+  EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 1u);
+  EXPECT_GT(*sock_a_.GetMetrics().peer_rwnd_bytes, 0u);
+  EXPECT_LT(*sock_a_.GetMetrics().peer_rwnd_bytes, initial_a_rwnd);
+
+  sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket());  // DATA
+
+  EXPECT_TRUE(cb_z_.ConsumeReceivedMessage().has_value());
+
+  EXPECT_EQ(sock_a_.GetMetrics().tx_packets_count, 6u);
+  EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 4u);
+  EXPECT_EQ(sock_a_.GetMetrics().tx_messages_count, 2u);
+
+  EXPECT_EQ(sock_z_.GetMetrics().rx_packets_count, 6u);
+  EXPECT_EQ(sock_z_.GetMetrics().rx_messages_count, 2u);
+
+  // Delayed sack
+  AdvanceTime(options_.delayed_ack_max_timeout);
+  RunTimers();
+
+  sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket());  // SACK
+  EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 0u);
+  EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 5u);
+  EXPECT_EQ(*sock_a_.GetMetrics().peer_rwnd_bytes, initial_a_rwnd);
+}
+
+TEST_F(DcSctpSocketTest, UnackDataAlsoIncludesSendQueue) {
+  ConnectSockets();
+
+  sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
+                             std::vector<uint8_t>(kLargeMessageSize)),
+               kSendOptions);
+  size_t payload_bytes =
+      options_.mtu - SctpPacket::kHeaderSize - DataChunk::kHeaderSize;
+
+  size_t expected_sent_packets = options_.cwnd_mtus_initial;
+
+  size_t expected_queued_bytes =
+      kLargeMessageSize - expected_sent_packets * payload_bytes;
+
+  size_t expected_queued_packets = expected_queued_bytes / payload_bytes;
+
+  // Due to alignment, padding etc, it's hard to calculate the exact number, but
+  // it should be in this range.
+  EXPECT_GE(sock_a_.GetMetrics().unack_data_count,
+            expected_sent_packets + expected_queued_packets);
+
+  EXPECT_LE(sock_a_.GetMetrics().unack_data_count,
+            expected_sent_packets + expected_queued_packets + 2);
+}
+
 }  // namespace
 }  // namespace dcsctp
diff --git a/net/dcsctp/socket/transmission_control_block.h b/net/dcsctp/socket/transmission_control_block.h
index 172f7c0..8e94b43 100644
--- a/net/dcsctp/socket/transmission_control_block.h
+++ b/net/dcsctp/socket/transmission_control_block.h
@@ -130,6 +130,8 @@
   RetransmissionQueue& retransmission_queue() { return retransmission_queue_; }
   StreamResetHandler& stream_reset_handler() { return stream_reset_handler_; }
   HeartbeatHandler& heartbeat_handler() { return heartbeat_handler_; }
+  size_t cwnd() const { return retransmission_queue_.cwnd(); }
+  DurationMs current_srtt() const { return rto_.srtt(); }
 
   // Returns this socket's verification tag, set in all packet headers.
   VerificationTag my_verification_tag() const { return my_verification_tag_; }