Add a callback to use StreamStats::rtp_stats as the source of truth.

Before the change RtpSenderEgress::rtp_stats_ and RtpSenderEgress::rtx_rtp_stats_ are updated in RtpSenderEgress and copied in StreamStats::rtp_stats.

After the change StreamStats::rtp_stats is directly used in RtpSenderEgress by using the added StreamDataCountersCallback::GetDataCounters.

An impact of this change is that FEC will now have its own counters.
Before the change FEC was using the RTP counters because of this code:
https://source.chromium.org/chromium/chromium/src/+/main:third_party/webrtc/modules/rtp_rtcp/source/rtp_sender_egress.cc;l=467-468.

This refactoring is meant to simplify https://webrtc-review.googlesource.com/c/src/+/381100.

Change-Id: I4913e9311fe35d1b2ca1ae0c0945e8e6e4cd7f5d
Bug: webrtc:40644448
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/381241
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Jeremy Leconte <jleconte@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#44131}
diff --git a/modules/rtp_rtcp/include/rtp_rtcp_defines.h b/modules/rtp_rtcp/include/rtp_rtcp_defines.h
index ca6e725..ba04483 100644
--- a/modules/rtp_rtcp/include/rtp_rtcp_defines.h
+++ b/modules/rtp_rtcp/include/rtp_rtcp_defines.h
@@ -380,6 +380,10 @@
  public:
   virtual ~StreamDataCountersCallback() {}
 
+  // TODO: webrtc:40644448 - Make this pure virtual.
+  virtual StreamDataCounters GetDataCounters(uint32_t ssrc) const {
+    RTC_CHECK_NOTREACHED();
+  }
   virtual void DataCountersUpdated(const StreamDataCounters& counters,
                                    uint32_t ssrc) = 0;
 };
diff --git a/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.cc b/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.cc
index fceb541..25a4f14 100644
--- a/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.cc
+++ b/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.cc
@@ -217,8 +217,15 @@
     StreamDataCounters* rtp_stats,
     StreamDataCounters* rtx_stats) const {
   MutexLock lock(&lock_);
-  *rtp_stats = rtp_stats_;
-  *rtx_stats = rtx_rtp_stats_;
+  if (rtp_stats_callback_) {
+    *rtp_stats = rtp_stats_callback_->GetDataCounters(ssrc_);
+    if (rtx_ssrc_.has_value()) {
+      *rtx_stats = rtp_stats_callback_->GetDataCounters(*rtx_ssrc_);
+    }
+  } else {
+    *rtp_stats = rtp_stats_;
+    *rtx_stats = rtx_rtp_stats_;
+  }
 }
 
 void DEPRECATED_RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
@@ -329,8 +336,13 @@
 void DEPRECATED_RtpSenderEgress::UpdateRtpStats(const RtpPacketToSend& packet) {
   Timestamp now = env_.clock().CurrentTime();
 
-  StreamDataCounters* counters =
-      packet.Ssrc() == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
+  StreamDataCounters* counters = nullptr;
+  if (rtp_stats_callback_) {
+    rtp_stats_ = rtp_stats_callback_->GetDataCounters(packet.Ssrc());
+    counters = &rtp_stats_;
+  } else {
+    counters = packet.Ssrc() == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
+  }
 
   counters->MaybeSetFirstPacketTime(now);
 
diff --git a/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.h b/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.h
index d1040d5..6fdbc4f 100644
--- a/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.h
+++ b/modules/rtp_rtcp/source/deprecated/deprecated_rtp_sender_egress.h
@@ -120,8 +120,10 @@
   bool force_part_of_allocation_ RTC_GUARDED_BY(lock_);
   uint32_t timestamp_offset_ RTC_GUARDED_BY(lock_);
 
+  // These counters are only used if `rtp_stats_callback_` is null.
   StreamDataCounters rtp_stats_ RTC_GUARDED_BY(lock_);
   StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(lock_);
+
   // One element per value in RtpPacketMediaType, with index matching value.
   std::vector<BitrateTracker> send_rates_ RTC_GUARDED_BY(lock_);
 
diff --git a/modules/rtp_rtcp/source/rtp_rtcp_impl2_unittest.cc b/modules/rtp_rtcp/source/rtp_rtcp_impl2_unittest.cc
index 3cf5732..b488106 100644
--- a/modules/rtp_rtcp/source/rtp_rtcp_impl2_unittest.cc
+++ b/modules/rtp_rtcp/source/rtp_rtcp_impl2_unittest.cc
@@ -159,7 +159,8 @@
 };
 
 class RtpRtcpModule : public RtcpPacketTypeCounterObserver,
-                      public SendPacketObserver {
+                      public SendPacketObserver,
+                      public StreamDataCountersCallback {
  public:
   struct SentPacket {
     SentPacket(uint16_t packet_id, Timestamp capture_time, uint32_t ssrc)
@@ -202,6 +203,15 @@
     }
   }
 
+  StreamDataCounters GetDataCounters(uint32_t ssrc) const override {
+    auto it = counters_by_ssrc.find(ssrc);
+    return it != counters_by_ssrc.end() ? it->second : StreamDataCounters();
+  }
+  void DataCountersUpdated(const StreamDataCounters& counters,
+                           uint32_t ssrc) override {
+    counters_by_ssrc[ssrc] = counters;
+  }
+
   std::optional<SentPacket> last_sent_packet() const {
     return last_sent_packet_;
   }
@@ -249,6 +259,7 @@
     config.need_rtp_packet_infos = true;
     config.non_sender_rtt_measurement = true;
     config.send_packet_observer = this;
+    config.rtp_stats_callback = this;
     config.fec_generator = fec_generator_;
     impl_ = std::make_unique<ModuleRtpRtcpImpl2>(env_, config);
     impl_->SetRemoteSSRC(is_sender_ ? kReceiverSsrc : kSenderSsrc);
@@ -257,6 +268,7 @@
 
  private:
   std::map<uint32_t, RtcpPacketTypeCounter> counter_map_;
+  std::map<uint32_t, StreamDataCounters> counters_by_ssrc;
   std::optional<SentPacket> last_sent_packet_;
   VideoFecGenerator* fec_generator_ = nullptr;
   TimeDelta rtcp_report_interval_ = kDefaultReportInterval;
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.cc b/modules/rtp_rtcp/source/rtp_sender_egress.cc
index 1f110d1..640601f 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_egress.cc
@@ -346,8 +346,15 @@
 void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats,
                                       StreamDataCounters* rtx_stats) const {
   RTC_DCHECK_RUN_ON(worker_queue_);
-  *rtp_stats = rtp_stats_;
-  *rtx_stats = rtx_rtp_stats_;
+  if (rtp_stats_callback_) {
+    *rtp_stats = rtp_stats_callback_->GetDataCounters(ssrc_);
+    if (rtx_ssrc_.has_value()) {
+      *rtx_stats = rtp_stats_callback_->GetDataCounters(*rtx_ssrc_);
+    }
+  } else {
+    *rtp_stats = rtp_stats_;
+    *rtx_stats = rtx_rtp_stats_;
+  }
 }
 
 void RtpSenderEgress::ForceIncludeSendPacketsInAllocation(
@@ -464,8 +471,13 @@
   // worker thread.
   RtpSendRates send_rates;
 
-  StreamDataCounters* counters =
-      packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
+  StreamDataCounters* counters = nullptr;
+  if (rtp_stats_callback_) {
+    rtp_stats_ = rtp_stats_callback_->GetDataCounters(packet_ssrc);
+    counters = &rtp_stats_;
+  } else {
+    counters = packet_ssrc == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
+  }
 
   counters->MaybeSetFirstPacketTime(now);
 
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress.h b/modules/rtp_rtcp/source/rtp_sender_egress.h
index ad369b7..7c4bb2a 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress.h
+++ b/modules/rtp_rtcp/source/rtp_sender_egress.h
@@ -148,8 +148,10 @@
   bool force_part_of_allocation_ RTC_GUARDED_BY(worker_queue_);
   uint32_t timestamp_offset_ RTC_GUARDED_BY(worker_queue_);
 
+  // These counters are only used if `rtp_stats_callback_` is null.
   StreamDataCounters rtp_stats_ RTC_GUARDED_BY(worker_queue_);
   StreamDataCounters rtx_rtp_stats_ RTC_GUARDED_BY(worker_queue_);
+
   // One element per value in RtpPacketMediaType, with index matching value.
   std::vector<BitrateTracker> send_rates_ RTC_GUARDED_BY(worker_queue_);
   std::optional<std::pair<FecProtectionParams, FecProtectionParams>>
diff --git a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc
index 327edea..5497308 100644
--- a/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc
+++ b/modules/rtp_rtcp/source/rtp_sender_egress_unittest.cc
@@ -12,6 +12,7 @@
 
 #include <cstddef>
 #include <cstdint>
+#include <map>
 #include <memory>
 #include <optional>
 #include <utility>
@@ -74,9 +75,20 @@
 class MockStreamDataCountersCallback : public StreamDataCountersCallback {
  public:
   MOCK_METHOD(void,
-              DataCountersUpdated,
-              (const StreamDataCounters& counters, uint32_t ssrc),
-              (override));
+              MockDataCountersUpdated,
+              (const StreamDataCounters& counters, uint32_t ssrc));
+
+  StreamDataCounters GetDataCounters(uint32_t ssrc) const override {
+    auto it = counters_by_ssrc.find(ssrc);
+    return it != counters_by_ssrc.end() ? it->second : StreamDataCounters();
+  }
+  void DataCountersUpdated(const StreamDataCounters& counters,
+                           uint32_t ssrc) override {
+    MockDataCountersUpdated(counters, ssrc);
+    counters_by_ssrc[ssrc] = counters;
+  }
+
+  std::map<uint32_t, StreamDataCounters> counters_by_ssrc;
 };
 
 struct TransmittedPacket {
@@ -571,14 +583,14 @@
   expected_transmitted_counter.header_bytes += media_packet->headers_size();
   expected_transmitted_counter.total_packet_delay += TimeDelta::Millis(10);
 
-  EXPECT_CALL(
-      mock_rtp_stats_callback_,
-      DataCountersUpdated(AllOf(Field(&StreamDataCounters::transmitted,
-                                      expected_transmitted_counter),
-                                Field(&StreamDataCounters::retransmitted,
-                                      expected_retransmission_counter),
-                                Field(&StreamDataCounters::fec, kEmptyCounter)),
-                          kSsrc));
+  EXPECT_CALL(mock_rtp_stats_callback_,
+              MockDataCountersUpdated(
+                  AllOf(Field(&StreamDataCounters::transmitted,
+                              expected_transmitted_counter),
+                        Field(&StreamDataCounters::retransmitted,
+                              expected_retransmission_counter),
+                        Field(&StreamDataCounters::fec, kEmptyCounter)),
+                  kSsrc));
   sender->SendPacket(std::move(media_packet), PacedPacketInfo());
   time_controller_.AdvanceTime(TimeDelta::Zero());
 
@@ -604,14 +616,14 @@
       retransmission_packet->headers_size();
   expected_retransmission_counter.total_packet_delay += TimeDelta::Millis(20);
 
-  EXPECT_CALL(
-      mock_rtp_stats_callback_,
-      DataCountersUpdated(AllOf(Field(&StreamDataCounters::transmitted,
-                                      expected_transmitted_counter),
-                                Field(&StreamDataCounters::retransmitted,
-                                      expected_retransmission_counter),
-                                Field(&StreamDataCounters::fec, kEmptyCounter)),
-                          kSsrc));
+  EXPECT_CALL(mock_rtp_stats_callback_,
+              MockDataCountersUpdated(
+                  AllOf(Field(&StreamDataCounters::transmitted,
+                              expected_transmitted_counter),
+                        Field(&StreamDataCounters::retransmitted,
+                              expected_retransmission_counter),
+                        Field(&StreamDataCounters::fec, kEmptyCounter)),
+                  kSsrc));
   sender->SendPacket(std::move(retransmission_packet), PacedPacketInfo());
   time_controller_.AdvanceTime(TimeDelta::Zero());
 
@@ -626,14 +638,14 @@
   expected_transmitted_counter.header_bytes += padding_packet->headers_size();
   expected_transmitted_counter.total_packet_delay += TimeDelta::Millis(30);
 
-  EXPECT_CALL(
-      mock_rtp_stats_callback_,
-      DataCountersUpdated(AllOf(Field(&StreamDataCounters::transmitted,
-                                      expected_transmitted_counter),
-                                Field(&StreamDataCounters::retransmitted,
-                                      expected_retransmission_counter),
-                                Field(&StreamDataCounters::fec, kEmptyCounter)),
-                          kSsrc));
+  EXPECT_CALL(mock_rtp_stats_callback_,
+              MockDataCountersUpdated(
+                  AllOf(Field(&StreamDataCounters::transmitted,
+                              expected_transmitted_counter),
+                        Field(&StreamDataCounters::retransmitted,
+                              expected_retransmission_counter),
+                        Field(&StreamDataCounters::fec, kEmptyCounter)),
+                  kSsrc));
   sender->SendPacket(std::move(padding_packet), PacedPacketInfo());
   time_controller_.AdvanceTime(TimeDelta::Zero());
 }
@@ -654,7 +666,7 @@
 
   EXPECT_CALL(
       mock_rtp_stats_callback_,
-      DataCountersUpdated(
+      MockDataCountersUpdated(
           AllOf(Field(&StreamDataCounters::transmitted,
                       expected_transmitted_counter),
                 Field(&StreamDataCounters::retransmitted, kEmptyCounter),
@@ -678,7 +690,7 @@
 
   EXPECT_CALL(
       mock_rtp_stats_callback_,
-      DataCountersUpdated(
+      MockDataCountersUpdated(
           AllOf(Field(&StreamDataCounters::transmitted,
                       expected_transmitted_counter),
                 Field(&StreamDataCounters::retransmitted, kEmptyCounter),
@@ -888,11 +900,15 @@
   sender.SendPacket(std::move(fec_packet), PacedPacketInfo());
 
   time_controller_.AdvanceTime(TimeDelta::Zero());
-  StreamDataCounters rtp_stats;
-  StreamDataCounters rtx_stats;
+  StreamDataCounters rtp_stats =
+      mock_rtp_stats_callback_.GetDataCounters(kSsrc);
+  StreamDataCounters rtx_stats =
+      mock_rtp_stats_callback_.GetDataCounters(kRtxSsrc);
+  StreamDataCounters fec_stats =
+      mock_rtp_stats_callback_.GetDataCounters(kFlexFecSsrc);
   sender.GetDataCounters(&rtp_stats, &rtx_stats);
-  EXPECT_EQ(rtp_stats.transmitted.packets, 2u);
-  EXPECT_EQ(rtp_stats.fec.packets, 1u);
+  EXPECT_EQ(rtp_stats.transmitted.packets, 1u);
+  EXPECT_EQ(fec_stats.transmitted.packets, 1u);
   EXPECT_EQ(rtx_stats.retransmitted.packets, 1u);
 }
 
diff --git a/test/scenario/BUILD.gn b/test/scenario/BUILD.gn
index f5c5cd5..812db86 100644
--- a/test/scenario/BUILD.gn
+++ b/test/scenario/BUILD.gn
@@ -197,6 +197,7 @@
       "../../system_wrappers:field_trial",
       "../../test:field_trial",
       "../../test:test_support",
+      "../../test:video_test_constants",
       "../logging:log_writer",
       "//testing/gmock",
     ]
diff --git a/test/scenario/video_stream_unittest.cc b/test/scenario/video_stream_unittest.cc
index c55dbf2..b0d1f4d 100644
--- a/test/scenario/video_stream_unittest.cc
+++ b/test/scenario/video_stream_unittest.cc
@@ -14,6 +14,7 @@
 #include "test/field_trial.h"
 #include "test/gtest.h"
 #include "test/scenario/scenario.h"
+#include "test/video_test_constants.h"
 
 namespace webrtc {
 namespace test {
@@ -176,7 +177,9 @@
   s.RunFor(TimeDelta::Seconds(5));
   VideoSendStream::Stats video_stats;
   route->first()->SendTask([&]() { video_stats = video->send()->GetStats(); });
-  EXPECT_GT(video_stats.substreams.begin()->second.rtp_stats.fec.packets, 0u);
+  EXPECT_GT(video_stats.substreams[VideoTestConstants::kFlexfecSendSsrc]
+                .rtp_stats.fec.packets,
+            0u);
 }
 
 TEST(VideoStreamTest, ResolutionAdaptsToAvailableBandwidth) {
diff --git a/video/send_statistics_proxy.cc b/video/send_statistics_proxy.cc
index 6c64da8..dad623d 100644
--- a/video/send_statistics_proxy.cc
+++ b/video/send_statistics_proxy.cc
@@ -1348,6 +1348,13 @@
   stats->report_block_data = std::move(report_block);
 }
 
+StreamDataCounters SendStatisticsProxy::GetDataCounters(uint32_t ssrc) const {
+  MutexLock lock(&mutex_);
+  auto it = stats_.substreams.find(ssrc);
+  return it != stats_.substreams.end() ? it->second.rtp_stats
+                                       : StreamDataCounters();
+}
+
 void SendStatisticsProxy::DataCountersUpdated(
     const StreamDataCounters& counters,
     uint32_t ssrc) {
@@ -1355,12 +1362,6 @@
   VideoSendStream::StreamStats* stats = GetStatsEntry(ssrc);
   RTC_DCHECK(stats) << "DataCountersUpdated reported for unknown ssrc " << ssrc;
 
-  if (stats->type == VideoSendStream::StreamStats::StreamType::kFlexfec) {
-    // The same counters are reported for both the media ssrc and flexfec ssrc.
-    // Bitrate stats are summed for all SSRCs. Use fec stats from media update.
-    return;
-  }
-
   stats->rtp_stats = counters;
   if (uma_container_->first_rtp_stats_time_ms_ == -1) {
     int64_t now_ms = clock_->TimeInMilliseconds();
diff --git a/video/send_statistics_proxy.h b/video/send_statistics_proxy.h
index a2deab4..e7782c8 100644
--- a/video/send_statistics_proxy.h
+++ b/video/send_statistics_proxy.h
@@ -116,6 +116,7 @@
       uint32_t ssrc,
       const RtcpPacketTypeCounter& packet_counter) override;
   // From StreamDataCountersCallback.
+  StreamDataCounters GetDataCounters(uint32_t ssrc) const override;
   void DataCountersUpdated(const StreamDataCounters& counters,
                            uint32_t ssrc) override;
 
diff --git a/video/send_statistics_proxy_unittest.cc b/video/send_statistics_proxy_unittest.cc
index f2a757c..5a1147b 100644
--- a/video/send_statistics_proxy_unittest.cc
+++ b/video/send_statistics_proxy_unittest.cc
@@ -2601,6 +2601,7 @@
       static_cast<StreamDataCountersCallback*>(statistics_proxy_.get());
   StreamDataCounters counters;
   StreamDataCounters rtx_counters;
+  StreamDataCounters flexfec_counters;
 
   const int kMinRequiredPeriodSamples = 8;
   const int kPeriodIntervalMs = 2000;
@@ -2610,10 +2611,10 @@
     counters.transmitted.padding_bytes += 1000;
     counters.transmitted.payload_bytes += 2000;
     counters.retransmitted.packets += 2;
-    counters.retransmitted.header_bytes += 25;
-    counters.retransmitted.padding_bytes += 100;
+    counters.retransmitted.header_bytes += 50;
+    counters.retransmitted.padding_bytes += 200;
     counters.retransmitted.payload_bytes += 250;
-    counters.fec = counters.retransmitted;
+    flexfec_counters.fec = counters.retransmitted;
     rtx_counters.transmitted = counters.transmitted;
     // Advance one interval and update counters.
     fake_clock_.AdvanceTimeMilliseconds(kPeriodIntervalMs);
@@ -2621,7 +2622,7 @@
     proxy->DataCountersUpdated(counters, kSecondSsrc);
     proxy->DataCountersUpdated(rtx_counters, kFirstRtxSsrc);
     proxy->DataCountersUpdated(rtx_counters, kSecondRtxSsrc);
-    proxy->DataCountersUpdated(counters, kFlexFecSsrc);
+    proxy->DataCountersUpdated(flexfec_counters, kFlexFecSsrc);
   }
 
   statistics_proxy_.reset();
@@ -2632,25 +2633,25 @@
   EXPECT_METRIC_EQ(1, metrics::NumSamples("WebRTC.Video.RtxBitrateSentInKbps"));
   EXPECT_METRIC_EQ(1,
                    metrics::NumEvents("WebRTC.Video.RtxBitrateSentInKbps", 28));
-  // Interval: (2000 - 2 * 250) bytes / 2 sec = 1500 bytes / sec  = 12 kbps
+  // Interval: (2000 - 250) bytes / 2 sec = 1750 bytes / sec  = 14 kbps
   EXPECT_METRIC_EQ(1,
                    metrics::NumSamples("WebRTC.Video.MediaBitrateSentInKbps"));
   EXPECT_METRIC_EQ(
-      1, metrics::NumEvents("WebRTC.Video.MediaBitrateSentInKbps", 12));
+      1, metrics::NumEvents("WebRTC.Video.MediaBitrateSentInKbps", 14));
   // Interval: 1000 bytes * 4 / 2 sec = 2000 bytes / sec = 16 kbps
   EXPECT_METRIC_EQ(
       1, metrics::NumSamples("WebRTC.Video.PaddingBitrateSentInKbps"));
   EXPECT_METRIC_EQ(
       1, metrics::NumEvents("WebRTC.Video.PaddingBitrateSentInKbps", 16));
-  // Interval: 375 bytes * 2 / 2 sec = 375 bytes / sec = 3 kbps
+  // Interval: 500 bytes / 2 sec = 200 bytes / sec = 2 kbps
   EXPECT_METRIC_EQ(1, metrics::NumSamples("WebRTC.Video.FecBitrateSentInKbps"));
   EXPECT_METRIC_EQ(1,
-                   metrics::NumEvents("WebRTC.Video.FecBitrateSentInKbps", 3));
-  // Interval: 375 bytes * 2 / 2 sec = 375 bytes / sec = 3 kbps
+                   metrics::NumEvents("WebRTC.Video.FecBitrateSentInKbps", 2));
+  // Interval: 500 bytes * 2 / 2 sec = 375 bytes / sec = 4 kbps
   EXPECT_METRIC_EQ(
       1, metrics::NumSamples("WebRTC.Video.RetransmittedBitrateSentInKbps"));
   EXPECT_METRIC_EQ(
-      1, metrics::NumEvents("WebRTC.Video.RetransmittedBitrateSentInKbps", 3));
+      1, metrics::NumEvents("WebRTC.Video.RetransmittedBitrateSentInKbps", 4));
 }
 
 TEST_F(SendStatisticsProxyTest, ResetsRtpCountersOnContentChange) {