dcsctp: Add metrics support
To support implementing RTCSctpTransportStats, a few metrics are needed.
Some more were added that are useful for metric collection in SFUs.
Bug: webrtc:13052
Change-Id: Idafd49e1084922d01d3e6c5860715f63aea08b7d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/228243
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34708}
diff --git a/net/dcsctp/public/dcsctp_socket.h b/net/dcsctp/public/dcsctp_socket.h
index f07f54e..dde0cb6 100644
--- a/net/dcsctp/public/dcsctp_socket.h
+++ b/net/dcsctp/public/dcsctp_socket.h
@@ -155,6 +155,43 @@
}
}
+// Tracked metrics, which is the return value of GetMetrics. Optional members
+// will be unset when they are not yet known.
+struct Metrics {
+ // Transmission stats and metrics.
+
+ // Number of packets sent.
+ size_t tx_packets_count = 0;
+
+ // Number of messages requested to be sent.
+ size_t tx_messages_count = 0;
+
+ // The current congestion window (cwnd) in bytes, corresponding to spinfo_cwnd
+ // defined in RFC6458.
+ absl::optional<size_t> cwnd_bytes = absl::nullopt;
+
+ // Smoothed round trip time, corresponding to spinfo_srtt defined in RFC6458.
+ absl::optional<int> srtt_ms = absl::nullopt;
+
+ // Number of data items in the retransmission queue that haven’t been
+ // acked/nacked yet and are in-flight. Corresponding to sstat_unackdata
+ // defined in RFC6458. This may be an approximation when there are messages in
+ // the send queue that haven't been fragmented/packetized yet.
+ size_t unack_data_count = 0;
+
+ // Receive stats and metrics.
+
+ // Number of packets received.
+ size_t rx_packets_count = 0;
+
+ // Number of messages received.
+ size_t rx_messages_count = 0;
+
+ // The peer’s last announced receiver window size, corresponding to
+ // sstat_rwnd defined in RFC6458.
+ absl::optional<uint32_t> peer_rwnd_bytes = absl::nullopt;
+};
+
// Callbacks that the DcSctpSocket will be done synchronously to the owning
// client. It is allowed to call back into the library from callbacks that start
// with "On". It has been explicitly documented when it's not allowed to call
@@ -350,6 +387,9 @@
// OnBufferedAmountLow event. The default value is zero (0).
virtual void SetBufferedAmountLowThreshold(StreamID stream_id,
size_t bytes) = 0;
+
+ // Retrieves the latest metrics.
+ virtual Metrics GetMetrics() const = 0;
};
} // namespace dcsctp
diff --git a/net/dcsctp/public/mock_dcsctp_socket.h b/net/dcsctp/public/mock_dcsctp_socket.h
index 1814064..b382773 100644
--- a/net/dcsctp/public/mock_dcsctp_socket.h
+++ b/net/dcsctp/public/mock_dcsctp_socket.h
@@ -57,6 +57,8 @@
SetBufferedAmountLowThreshold,
(StreamID stream_id, size_t bytes),
(override));
+
+ MOCK_METHOD(Metrics, GetMetrics, (), (const, override));
};
} // namespace dcsctp
diff --git a/net/dcsctp/socket/dcsctp_socket.cc b/net/dcsctp/socket/dcsctp_socket.cc
index 71bc98c..e2b04af 100644
--- a/net/dcsctp/socket/dcsctp_socket.cc
+++ b/net/dcsctp/socket/dcsctp_socket.cc
@@ -381,6 +381,7 @@
}
TimeMs now = callbacks_.TimeMillis();
+ ++metrics_.tx_messages_count;
send_queue_.Add(now, std::move(message), send_options);
if (tcb_ != nullptr) {
tcb_->SendBufferedPackets(now);
@@ -456,6 +457,26 @@
send_queue_.SetBufferedAmountLowThreshold(stream_id, bytes);
}
+Metrics DcSctpSocket::GetMetrics() const {
+ Metrics metrics = metrics_;
+
+ if (tcb_ != nullptr) {
+ // Update the metrics with some stats that are extracted from
+ // sub-components.
+ metrics.cwnd_bytes = tcb_->cwnd();
+ metrics.srtt_ms = tcb_->current_srtt().value();
+ size_t packet_payload_size =
+ options_.mtu - SctpPacket::kHeaderSize - DataChunk::kHeaderSize;
+ metrics.unack_data_count =
+ tcb_->retransmission_queue().outstanding_items() +
+ (send_queue_.total_buffered_amount() + packet_payload_size - 1) /
+ packet_payload_size;
+ metrics.peer_rwnd_bytes = tcb_->retransmission_queue().rwnd();
+ }
+
+ return metrics;
+}
+
void DcSctpSocket::MaybeSendShutdownOnPacketReceived(const SctpPacket& packet) {
if (state_ == State::kShutdownSent) {
bool has_data_chunk =
@@ -588,6 +609,8 @@
}
void DcSctpSocket::ReceivePacket(rtc::ArrayView<const uint8_t> data) {
+ ++metrics_.rx_packets_count;
+
if (packet_observer_ != nullptr) {
packet_observer_->OnReceivedPacket(callbacks_.TimeMillis(), data);
}
@@ -834,6 +857,7 @@
if (packet_observer_ != nullptr) {
packet_observer_->OnSentPacket(callbacks_.TimeMillis(), payload);
}
+ ++metrics_.tx_packets_count;
callbacks_.SendPacket(payload);
}
@@ -1267,6 +1291,7 @@
void DcSctpSocket::DeliverReassembledMessages() {
if (tcb_->reassembly_queue().HasMessages()) {
for (auto& message : tcb_->reassembly_queue().FlushMessages()) {
+ ++metrics_.rx_messages_count;
callbacks_.OnMessageReceived(std::move(message));
}
}
diff --git a/net/dcsctp/socket/dcsctp_socket.h b/net/dcsctp/socket/dcsctp_socket.h
index 32e89b5..e0aae38 100644
--- a/net/dcsctp/socket/dcsctp_socket.h
+++ b/net/dcsctp/socket/dcsctp_socket.h
@@ -96,6 +96,8 @@
size_t buffered_amount(StreamID stream_id) const override;
size_t buffered_amount_low_threshold(StreamID stream_id) const override;
void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
+ Metrics GetMetrics() const override;
+
// Returns this socket's verification tag, or zero if not yet connected.
VerificationTag verification_tag() const {
return tcb_ != nullptr ? tcb_->my_verification_tag() : VerificationTag(0);
@@ -245,6 +247,7 @@
const std::string log_prefix_;
const std::unique_ptr<PacketObserver> packet_observer_;
+ Metrics metrics_;
DcSctpOptions options_;
// Enqueues callbacks and dispatches them just before returning to the caller.
diff --git a/net/dcsctp/socket/dcsctp_socket_test.cc b/net/dcsctp/socket/dcsctp_socket_test.cc
index 7ca3d9b..88138f6 100644
--- a/net/dcsctp/socket/dcsctp_socket_test.cc
+++ b/net/dcsctp/socket/dcsctp_socket_test.cc
@@ -1608,5 +1608,110 @@
ExchangeMessages(sock_a_, cb_a_, sock_z_, cb_z_);
}
+TEST_F(DcSctpSocketTest, InitialMetricsAreZeroed) {
+ Metrics metrics = sock_a_.GetMetrics();
+ EXPECT_EQ(metrics.tx_packets_count, 0u);
+ EXPECT_EQ(metrics.tx_messages_count, 0u);
+ EXPECT_EQ(metrics.cwnd_bytes.has_value(), false);
+ EXPECT_EQ(metrics.srtt_ms.has_value(), false);
+ EXPECT_EQ(metrics.unack_data_count, 0u);
+ EXPECT_EQ(metrics.rx_packets_count, 0u);
+ EXPECT_EQ(metrics.rx_messages_count, 0u);
+ EXPECT_EQ(metrics.peer_rwnd_bytes.has_value(), false);
+}
+
+TEST_F(DcSctpSocketTest, RxAndTxPacketMetricsIncrease) {
+ ConnectSockets();
+
+ const size_t initial_a_rwnd = options_.max_receiver_window_buffer_size *
+ ReassemblyQueue::kHighWatermarkLimit;
+
+ EXPECT_EQ(sock_a_.GetMetrics().tx_packets_count, 2u);
+ EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 2u);
+ EXPECT_EQ(sock_a_.GetMetrics().tx_messages_count, 0u);
+ EXPECT_EQ(*sock_a_.GetMetrics().cwnd_bytes,
+ options_.cwnd_mtus_initial * options_.mtu);
+ EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 0u);
+
+ EXPECT_EQ(sock_z_.GetMetrics().rx_packets_count, 2u);
+ EXPECT_EQ(sock_z_.GetMetrics().rx_messages_count, 0u);
+
+ sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53), {1, 2}), kSendOptions);
+ EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 1u);
+
+ sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); // DATA
+ sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); // SACK
+ EXPECT_EQ(*sock_a_.GetMetrics().peer_rwnd_bytes, initial_a_rwnd);
+ EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 0u);
+
+ EXPECT_TRUE(cb_z_.ConsumeReceivedMessage().has_value());
+
+ EXPECT_EQ(sock_a_.GetMetrics().tx_packets_count, 3u);
+ EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 3u);
+ EXPECT_EQ(sock_a_.GetMetrics().tx_messages_count, 1u);
+
+ EXPECT_EQ(sock_z_.GetMetrics().rx_packets_count, 3u);
+ EXPECT_EQ(sock_z_.GetMetrics().rx_messages_count, 1u);
+
+ // Send one more (large - fragmented), and receive the delayed SACK.
+ sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
+ std::vector<uint8_t>(options_.mtu * 2 + 1)),
+ kSendOptions);
+ EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 3u);
+
+ sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); // DATA
+ sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); // DATA
+
+ sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); // SACK
+ EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 1u);
+ EXPECT_GT(*sock_a_.GetMetrics().peer_rwnd_bytes, 0u);
+ EXPECT_LT(*sock_a_.GetMetrics().peer_rwnd_bytes, initial_a_rwnd);
+
+ sock_z_.ReceivePacket(cb_a_.ConsumeSentPacket()); // DATA
+
+ EXPECT_TRUE(cb_z_.ConsumeReceivedMessage().has_value());
+
+ EXPECT_EQ(sock_a_.GetMetrics().tx_packets_count, 6u);
+ EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 4u);
+ EXPECT_EQ(sock_a_.GetMetrics().tx_messages_count, 2u);
+
+ EXPECT_EQ(sock_z_.GetMetrics().rx_packets_count, 6u);
+ EXPECT_EQ(sock_z_.GetMetrics().rx_messages_count, 2u);
+
+ // Delayed sack
+ AdvanceTime(options_.delayed_ack_max_timeout);
+ RunTimers();
+
+ sock_a_.ReceivePacket(cb_z_.ConsumeSentPacket()); // SACK
+ EXPECT_EQ(sock_a_.GetMetrics().unack_data_count, 0u);
+ EXPECT_EQ(sock_a_.GetMetrics().rx_packets_count, 5u);
+ EXPECT_EQ(*sock_a_.GetMetrics().peer_rwnd_bytes, initial_a_rwnd);
+}
+
+TEST_F(DcSctpSocketTest, UnackDataAlsoIncludesSendQueue) {
+ ConnectSockets();
+
+ sock_a_.Send(DcSctpMessage(StreamID(1), PPID(53),
+ std::vector<uint8_t>(kLargeMessageSize)),
+ kSendOptions);
+ size_t payload_bytes =
+ options_.mtu - SctpPacket::kHeaderSize - DataChunk::kHeaderSize;
+
+ size_t expected_sent_packets = options_.cwnd_mtus_initial;
+
+ size_t expected_queued_bytes =
+ kLargeMessageSize - expected_sent_packets * payload_bytes;
+
+ size_t expected_queued_packets = expected_queued_bytes / payload_bytes;
+
+ // Due to alignment, padding etc, it's hard to calculate the exact number, but
+ // it should be in this range.
+ EXPECT_GE(sock_a_.GetMetrics().unack_data_count,
+ expected_sent_packets + expected_queued_packets);
+
+ EXPECT_LE(sock_a_.GetMetrics().unack_data_count,
+ expected_sent_packets + expected_queued_packets + 2);
+}
+
} // namespace
} // namespace dcsctp
diff --git a/net/dcsctp/socket/transmission_control_block.h b/net/dcsctp/socket/transmission_control_block.h
index 172f7c0..8e94b43 100644
--- a/net/dcsctp/socket/transmission_control_block.h
+++ b/net/dcsctp/socket/transmission_control_block.h
@@ -130,6 +130,8 @@
RetransmissionQueue& retransmission_queue() { return retransmission_queue_; }
StreamResetHandler& stream_reset_handler() { return stream_reset_handler_; }
HeartbeatHandler& heartbeat_handler() { return heartbeat_handler_; }
+ size_t cwnd() const { return retransmission_queue_.cwnd(); }
+ DurationMs current_srtt() const { return rto_.srtt(); }
// Returns this socket's verification tag, set in all packet headers.
VerificationTag my_verification_tag() const { return my_verification_tag_; }