Add per source network statistics

Add ability to obtain incoming data network statistic splitted by data
source IP address.

Bug: webrtc:11756
Change-Id: I023c99f6bd19363a52a358dba52d25cd507097ae
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/179368
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Andrey Logvin <landrey@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31770}
diff --git a/api/test/network_emulation/network_emulation_interfaces.h b/api/test/network_emulation/network_emulation_interfaces.h
index 0986df4..d27319e 100644
--- a/api/test/network_emulation/network_emulation_interfaces.h
+++ b/api/test/network_emulation/network_emulation_interfaces.h
@@ -10,6 +10,8 @@
 #ifndef API_TEST_NETWORK_EMULATION_NETWORK_EMULATION_INTERFACES_H_
 #define API_TEST_NETWORK_EMULATION_NETWORK_EMULATION_INTERFACES_H_
 
+#include <map>
+
 #include "absl/types/optional.h"
 #include "api/units/data_rate.h"
 #include "api/units/data_size.h"
@@ -56,9 +58,7 @@
   virtual void OnPacketReceived(EmulatedIpPacket packet) = 0;
 };
 
-struct EmulatedNetworkStats {
-  int64_t packets_sent = 0;
-  DataSize bytes_sent = DataSize::Zero();
+struct EmulatedNetworkIncomingStats {
   // Total amount of packets received with or without destination.
   int64_t packets_received = 0;
   // Total amount of bytes in received packets.
@@ -69,22 +69,118 @@
   DataSize bytes_dropped = DataSize::Zero();
 
   DataSize first_received_packet_size = DataSize::Zero();
-  DataSize first_sent_packet_size = DataSize::Zero();
 
-  Timestamp first_packet_sent_time = Timestamp::PlusInfinity();
-  Timestamp last_packet_sent_time = Timestamp::PlusInfinity();
+  // Timestamps are initialized to different infinities for simplifying
+  // computations. Client have to assume that it is some infinite value
+  // if unset. Client mustn't consider sign of infinit value.
   Timestamp first_packet_received_time = Timestamp::PlusInfinity();
-  Timestamp last_packet_received_time = Timestamp::PlusInfinity();
+  Timestamp last_packet_received_time = Timestamp::MinusInfinity();
+
+  DataRate AverageReceiveRate() const {
+    RTC_DCHECK_GE(packets_received, 2);
+    RTC_DCHECK(first_packet_received_time.IsFinite());
+    RTC_DCHECK(last_packet_received_time.IsFinite());
+    return (bytes_received - first_received_packet_size) /
+           (last_packet_received_time - first_packet_received_time);
+  }
+};
+
+struct EmulatedNetworkStats {
+  int64_t packets_sent = 0;
+  DataSize bytes_sent = DataSize::Zero();
+
+  DataSize first_sent_packet_size = DataSize::Zero();
+  Timestamp first_packet_sent_time = Timestamp::PlusInfinity();
+  Timestamp last_packet_sent_time = Timestamp::MinusInfinity();
+
+  std::map<rtc::IPAddress, EmulatedNetworkIncomingStats>
+      incoming_stats_per_source;
 
   DataRate AverageSendRate() const {
     RTC_DCHECK_GE(packets_sent, 2);
     return (bytes_sent - first_sent_packet_size) /
            (last_packet_sent_time - first_packet_sent_time);
   }
+
+  // Total amount of packets received regardless of the destination address.
+  int64_t PacketsReceived() const {
+    int64_t packets_received = 0;
+    for (const auto& incoming_stats : incoming_stats_per_source) {
+      packets_received += incoming_stats.second.packets_received;
+    }
+    return packets_received;
+  }
+
+  // Total amount of bytes in received packets.
+  DataSize BytesReceived() const {
+    DataSize bytes_received = DataSize::Zero();
+    for (const auto& incoming_stats : incoming_stats_per_source) {
+      bytes_received += incoming_stats.second.bytes_received;
+    }
+    return bytes_received;
+  }
+
+  // Total amount of packets that were received, but no destination was found.
+  int64_t PacketsDropped() const {
+    int64_t packets_dropped = 0;
+    for (const auto& incoming_stats : incoming_stats_per_source) {
+      packets_dropped += incoming_stats.second.packets_dropped;
+    }
+    return packets_dropped;
+  }
+
+  // Total amount of bytes in dropped packets.
+  DataSize BytesDropped() const {
+    DataSize bytes_dropped = DataSize::Zero();
+    for (const auto& incoming_stats : incoming_stats_per_source) {
+      bytes_dropped += incoming_stats.second.bytes_dropped;
+    }
+    return bytes_dropped;
+  }
+
+  DataSize FirstReceivedPacketSize() const {
+    Timestamp first_packet_received_time = Timestamp::PlusInfinity();
+    DataSize first_received_packet_size = DataSize::Zero();
+    for (const auto& incoming_stats : incoming_stats_per_source) {
+      if (first_packet_received_time >
+          incoming_stats.second.first_packet_received_time) {
+        first_packet_received_time =
+            incoming_stats.second.first_packet_received_time;
+        first_received_packet_size =
+            incoming_stats.second.first_received_packet_size;
+      }
+    }
+    return first_received_packet_size;
+  }
+
+  Timestamp FirstPacketReceivedTime() const {
+    Timestamp first_packet_received_time = Timestamp::PlusInfinity();
+    for (const auto& incoming_stats : incoming_stats_per_source) {
+      if (first_packet_received_time >
+          incoming_stats.second.first_packet_received_time) {
+        first_packet_received_time =
+            incoming_stats.second.first_packet_received_time;
+      }
+    }
+    return first_packet_received_time;
+  }
+
+  Timestamp LastPacketReceivedTime() const {
+    Timestamp last_packet_received_time = Timestamp::MinusInfinity();
+    for (const auto& incoming_stats : incoming_stats_per_source) {
+      if (last_packet_received_time <
+          incoming_stats.second.last_packet_received_time) {
+        last_packet_received_time =
+            incoming_stats.second.last_packet_received_time;
+      }
+    }
+    return last_packet_received_time;
+  }
+
   DataRate AverageReceiveRate() const {
-    RTC_DCHECK_GE(packets_received, 2);
-    return (bytes_received - first_received_packet_size) /
-           (last_packet_received_time - first_packet_received_time);
+    RTC_DCHECK_GE(PacketsReceived(), 2);
+    return (BytesReceived() - FirstReceivedPacketSize()) /
+           (LastPacketReceivedTime() - FirstPacketReceivedTime());
   }
 };
 
diff --git a/test/network/network_emulation.cc b/test/network/network_emulation.cc
index 3cb3def..37e307e 100644
--- a/test/network/network_emulation.cc
+++ b/test/network/network_emulation.cc
@@ -290,8 +290,9 @@
     // process: one peer closed connection, second still sending data.
     RTC_LOG(INFO) << "Drop packet: no receiver registered in " << id_
                   << " on port " << packet.to.port();
-    stats_.packets_dropped++;
-    stats_.bytes_dropped += DataSize::Bytes(packet.ip_packet_size());
+    stats_.incoming_stats_per_source[packet.from.ipaddr()].packets_dropped++;
+    stats_.incoming_stats_per_source[packet.from.ipaddr()].bytes_dropped +=
+        DataSize::Bytes(packet.ip_packet_size());
     return;
   }
   // Endpoint assumes frequent calls to bind and unbind methods, so it holds
@@ -325,14 +326,18 @@
 void EmulatedEndpointImpl::UpdateReceiveStats(const EmulatedIpPacket& packet) {
   RTC_DCHECK_RUN_ON(task_queue_);
   Timestamp current_time = clock_->CurrentTime();
-  if (stats_.first_packet_received_time.IsInfinite()) {
-    stats_.first_packet_received_time = current_time;
-    stats_.first_received_packet_size =
-        DataSize::Bytes(packet.ip_packet_size());
+  if (stats_.incoming_stats_per_source[packet.from.ipaddr()]
+          .first_packet_received_time.IsInfinite()) {
+    stats_.incoming_stats_per_source[packet.from.ipaddr()]
+        .first_packet_received_time = current_time;
+    stats_.incoming_stats_per_source[packet.from.ipaddr()]
+        .first_received_packet_size = DataSize::Bytes(packet.ip_packet_size());
   }
-  stats_.last_packet_received_time = current_time;
-  stats_.packets_received++;
-  stats_.bytes_received += DataSize::Bytes(packet.ip_packet_size());
+  stats_.incoming_stats_per_source[packet.from.ipaddr()]
+      .last_packet_received_time = current_time;
+  stats_.incoming_stats_per_source[packet.from.ipaddr()].packets_received++;
+  stats_.incoming_stats_per_source[packet.from.ipaddr()].bytes_received +=
+      DataSize::Bytes(packet.ip_packet_size());
 }
 
 EndpointsContainer::EndpointsContainer(
@@ -377,31 +382,31 @@
     EmulatedNetworkStats endpoint_stats = endpoint->stats();
     stats.packets_sent += endpoint_stats.packets_sent;
     stats.bytes_sent += endpoint_stats.bytes_sent;
-    stats.packets_received += endpoint_stats.packets_received;
-    stats.bytes_received += endpoint_stats.bytes_received;
-    stats.packets_dropped += endpoint_stats.packets_dropped;
-    stats.bytes_dropped += endpoint_stats.bytes_dropped;
-    if (stats.first_packet_received_time >
-        endpoint_stats.first_packet_received_time) {
-      stats.first_packet_received_time =
-          endpoint_stats.first_packet_received_time;
-      stats.first_received_packet_size =
-          endpoint_stats.first_received_packet_size;
-    }
     if (stats.first_packet_sent_time > endpoint_stats.first_packet_sent_time) {
       stats.first_packet_sent_time = endpoint_stats.first_packet_sent_time;
       stats.first_sent_packet_size = endpoint_stats.first_sent_packet_size;
     }
-    if (stats.last_packet_received_time.IsInfinite() ||
-        stats.last_packet_received_time <
-            endpoint_stats.last_packet_received_time) {
-      stats.last_packet_received_time =
-          endpoint_stats.last_packet_received_time;
-    }
-    if (stats.last_packet_sent_time.IsInfinite() ||
-        stats.last_packet_sent_time < endpoint_stats.last_packet_sent_time) {
+    if (stats.last_packet_sent_time < endpoint_stats.last_packet_sent_time) {
       stats.last_packet_sent_time = endpoint_stats.last_packet_sent_time;
     }
+    for (auto& entry : endpoint_stats.incoming_stats_per_source) {
+      const EmulatedNetworkIncomingStats& source = entry.second;
+      EmulatedNetworkIncomingStats& in_stats =
+          stats.incoming_stats_per_source[entry.first];
+      in_stats.packets_received += source.packets_received;
+      in_stats.bytes_received += source.bytes_received;
+      in_stats.packets_dropped += source.packets_dropped;
+      in_stats.bytes_dropped += source.bytes_dropped;
+      if (in_stats.first_packet_received_time >
+          source.first_packet_received_time) {
+        in_stats.first_packet_received_time = source.first_packet_received_time;
+        in_stats.first_received_packet_size = source.first_received_packet_size;
+      }
+      if (in_stats.last_packet_received_time <
+          source.last_packet_received_time) {
+        in_stats.last_packet_received_time = source.last_packet_received_time;
+      }
+    }
   }
   return stats;
 }
diff --git a/test/network/network_emulation.h b/test/network/network_emulation.h
index eadd8b2..a811a10 100644
--- a/test/network/network_emulation.h
+++ b/test/network/network_emulation.h
@@ -200,6 +200,7 @@
   EmulatedEndpointImpl* to;
   bool active;
 };
+
 class EndpointsContainer {
  public:
   explicit EndpointsContainer(
diff --git a/test/network/network_emulation_unittest.cc b/test/network/network_emulation_unittest.cc
index 6bd3b6e..fa10b1e 100644
--- a/test/network/network_emulation_unittest.cc
+++ b/test/network/network_emulation_unittest.cc
@@ -248,19 +248,52 @@
   nt1->GetStats([&](EmulatedNetworkStats st) {
     EXPECT_EQ(st.packets_sent, 2000l);
     EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 2000l);
-    EXPECT_EQ(st.packets_received, 2000l);
-    EXPECT_EQ(st.bytes_received.bytes(), single_packet_size * 2000l);
-    EXPECT_EQ(st.packets_dropped, 0l);
-    EXPECT_EQ(st.bytes_dropped.bytes(), 0l);
+    EXPECT_EQ(st.PacketsReceived(), 2000l);
+    EXPECT_EQ(st.BytesReceived().bytes(), single_packet_size * 2000l);
+    EXPECT_EQ(st.PacketsDropped(), 0l);
+    EXPECT_EQ(st.BytesDropped().bytes(), 0l);
+
+    EXPECT_EQ(st.incoming_stats_per_source[bob_endpoint->GetPeerLocalAddress()]
+                  .packets_received,
+              2000l);
+    EXPECT_EQ(st.incoming_stats_per_source[bob_endpoint->GetPeerLocalAddress()]
+                  .bytes_received.bytes(),
+              single_packet_size * 2000l);
+    EXPECT_EQ(st.incoming_stats_per_source[bob_endpoint->GetPeerLocalAddress()]
+                  .packets_dropped,
+              0l);
+    EXPECT_EQ(st.incoming_stats_per_source[bob_endpoint->GetPeerLocalAddress()]
+                  .bytes_dropped.bytes(),
+              0l);
     received_stats_count++;
   });
   nt2->GetStats([&](EmulatedNetworkStats st) {
     EXPECT_EQ(st.packets_sent, 2000l);
     EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 2000l);
-    EXPECT_EQ(st.packets_received, 2000l);
-    EXPECT_EQ(st.bytes_received.bytes(), single_packet_size * 2000l);
-    EXPECT_EQ(st.packets_dropped, 0l);
-    EXPECT_EQ(st.bytes_dropped.bytes(), 0l);
+    EXPECT_EQ(st.PacketsReceived(), 2000l);
+    EXPECT_EQ(st.BytesReceived().bytes(), single_packet_size * 2000l);
+    EXPECT_EQ(st.PacketsDropped(), 0l);
+    EXPECT_EQ(st.BytesDropped().bytes(), 0l);
+    EXPECT_GT(st.FirstReceivedPacketSize(), DataSize::Zero());
+    EXPECT_TRUE(st.FirstPacketReceivedTime().IsFinite());
+    EXPECT_TRUE(st.LastPacketReceivedTime().IsFinite());
+
+    EXPECT_EQ(
+        st.incoming_stats_per_source[alice_endpoint->GetPeerLocalAddress()]
+            .packets_received,
+        2000l);
+    EXPECT_EQ(
+        st.incoming_stats_per_source[alice_endpoint->GetPeerLocalAddress()]
+            .bytes_received.bytes(),
+        single_packet_size * 2000l);
+    EXPECT_EQ(
+        st.incoming_stats_per_source[alice_endpoint->GetPeerLocalAddress()]
+            .packets_dropped,
+        0l);
+    EXPECT_EQ(
+        st.incoming_stats_per_source[alice_endpoint->GetPeerLocalAddress()]
+            .bytes_dropped.bytes(),
+        0l);
     received_stats_count++;
   });
   ASSERT_EQ_SIMULATED_WAIT(received_stats_count.load(), 2,
diff --git a/test/pc/e2e/network_quality_metrics_reporter.cc b/test/pc/e2e/network_quality_metrics_reporter.cc
index 85276ee..cd6dfb5 100644
--- a/test/pc/e2e/network_quality_metrics_reporter.cc
+++ b/test/pc/e2e/network_quality_metrics_reporter.cc
@@ -36,10 +36,10 @@
   // Check that network stats are clean before test execution.
   EmulatedNetworkStats alice_stats = PopulateStats(alice_network_);
   RTC_CHECK_EQ(alice_stats.packets_sent, 0);
-  RTC_CHECK_EQ(alice_stats.packets_received, 0);
+  RTC_CHECK_EQ(alice_stats.PacketsReceived(), 0);
   EmulatedNetworkStats bob_stats = PopulateStats(bob_network_);
   RTC_CHECK_EQ(bob_stats.packets_sent, 0);
-  RTC_CHECK_EQ(bob_stats.packets_received, 0);
+  RTC_CHECK_EQ(bob_stats.PacketsReceived(), 0);
 }
 
 void NetworkQualityMetricsReporter::OnStatsReports(
@@ -72,9 +72,9 @@
   EmulatedNetworkStats alice_stats = PopulateStats(alice_network_);
   EmulatedNetworkStats bob_stats = PopulateStats(bob_network_);
   ReportStats("alice", alice_stats,
-              alice_stats.packets_sent - bob_stats.packets_received);
+              alice_stats.packets_sent - bob_stats.PacketsReceived());
   ReportStats("bob", bob_stats,
-              bob_stats.packets_sent - alice_stats.packets_received);
+              bob_stats.packets_sent - alice_stats.PacketsReceived());
 
   if (!webrtc::field_trial::IsEnabled(kUseStandardBytesStats)) {
     RTC_LOG(LS_ERROR)
@@ -111,16 +111,16 @@
       "average_send_rate", network_label,
       stats.packets_sent >= 2 ? stats.AverageSendRate().bytes_per_sec() : 0,
       "bytesPerSecond");
-  ReportResult("bytes_dropped", network_label, stats.bytes_dropped.bytes(),
+  ReportResult("bytes_dropped", network_label, stats.BytesDropped().bytes(),
                "sizeInBytes");
-  ReportResult("packets_dropped", network_label, stats.packets_dropped,
+  ReportResult("packets_dropped", network_label, stats.PacketsDropped(),
                "unitless");
-  ReportResult("bytes_received", network_label, stats.bytes_received.bytes(),
+  ReportResult("bytes_received", network_label, stats.BytesReceived().bytes(),
                "sizeInBytes");
-  ReportResult("packets_received", network_label, stats.packets_received,
+  ReportResult("packets_received", network_label, stats.PacketsReceived(),
                "unitless");
   ReportResult("average_receive_rate", network_label,
-               stats.packets_received >= 2
+               stats.PacketsReceived() >= 2
                    ? stats.AverageReceiveRate().bytes_per_sec()
                    : 0,
                "bytesPerSecond");