Introduce network emulation layer stats API.

Bug: webrtc:10138
Change-Id: I32133cd14c7a1933dcbeaa37d4c9ce6748274ebe
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/131383
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27588}
diff --git a/api/BUILD.gn b/api/BUILD.gn
index 31dd898..45d3bb5 100644
--- a/api/BUILD.gn
+++ b/api/BUILD.gn
@@ -434,6 +434,9 @@
   deps = [
     ":simulated_network_api",
     "../rtc_base",
+    "units:data_rate",
+    "units:data_size",
+    "units:timestamp",
   ]
 }
 
diff --git a/api/test/network_emulation_manager.h b/api/test/network_emulation_manager.h
index b409bce..404a8c0 100644
--- a/api/test/network_emulation_manager.h
+++ b/api/test/network_emulation_manager.h
@@ -15,6 +15,9 @@
 #include <vector>
 
 #include "api/test/simulated_network.h"
+#include "api/units/data_rate.h"
+#include "api/units/data_size.h"
+#include "api/units/timestamp.h"
 #include "rtc_base/network.h"
 #include "rtc_base/thread.h"
 
@@ -51,14 +54,51 @@
   bool start_as_enabled = true;
 };
 
+struct EmulatedNetworkStats {
+  int64_t packets_sent = 0;
+  DataSize bytes_sent = DataSize::Zero();
+  // Total amount of packets received with or without destination.
+  int64_t packets_received = 0;
+  // Total amount of bytes in received packets.
+  DataSize bytes_received = DataSize::Zero();
+  // Total amount of packets that were received, but no destination was found.
+  int64_t packets_dropped = 0;
+  // Total amount of bytes in dropped packets.
+  DataSize bytes_dropped = DataSize::Zero();
+
+  DataSize first_received_packet_size = DataSize::Zero();
+  DataSize first_sent_packet_size = DataSize::Zero();
+
+  Timestamp first_packet_sent_time = Timestamp::PlusInfinity();
+  Timestamp last_packet_sent_time = Timestamp::PlusInfinity();
+  Timestamp first_packet_received_time = Timestamp::PlusInfinity();
+  Timestamp last_packet_received_time = Timestamp::PlusInfinity();
+
+  DataRate AverageSendRate() const {
+    RTC_DCHECK_GE(packets_sent, 2);
+    return (bytes_sent - first_sent_packet_size) /
+           (last_packet_sent_time - first_packet_sent_time);
+  }
+  DataRate AverageReceiveRate() const {
+    RTC_DCHECK_GE(packets_received, 2);
+    return (bytes_received - first_received_packet_size) /
+           (last_packet_received_time - first_packet_received_time);
+  }
+};
+
 // Provide interface to obtain all required objects to inject network emulation
-// layer into PeerConnection.
+// layer into PeerConnection. Also contains information about network interfaces
+// accessible by PeerConnection.
 class EmulatedNetworkManagerInterface {
  public:
   virtual ~EmulatedNetworkManagerInterface() = default;
 
   virtual rtc::Thread* network_thread() = 0;
   virtual rtc::NetworkManager* network_manager() = 0;
+
+  // Returns summarized network stats for endpoints for this manager.
+  virtual void GetStats(
+      std::function<void(EmulatedNetworkStats)> stats_callback) const = 0;
 };
 
 // Provides an API for creating and configuring emulated network layer.
diff --git a/test/scenario/network/emulated_network_manager.cc b/test/scenario/network/emulated_network_manager.cc
index b6f349b..5ee9c16 100644
--- a/test/scenario/network/emulated_network_manager.cc
+++ b/test/scenario/network/emulated_network_manager.cc
@@ -20,8 +20,10 @@
 
 EmulatedNetworkManager::EmulatedNetworkManager(
     Clock* clock,
+    TaskQueueForTest* task_queue,
     EndpointsContainer* endpoints_container)
-    : endpoints_container_(endpoints_container),
+    : task_queue_(task_queue),
+      endpoints_container_(endpoints_container),
       socket_server_(clock, endpoints_container),
       network_thread_(&socket_server_),
       sent_first_update_(false),
@@ -77,6 +79,13 @@
   }
 }
 
+void EmulatedNetworkManager::GetStats(
+    std::function<void(EmulatedNetworkStats)> stats_callback) const {
+  task_queue_->PostTask([stats_callback, this]() {
+    stats_callback(endpoints_container_->GetStats());
+  });
+}
+
 void EmulatedNetworkManager::UpdateNetworksOnce() {
   RTC_DCHECK_RUN_ON(&network_thread_);
 
diff --git a/test/scenario/network/emulated_network_manager.h b/test/scenario/network/emulated_network_manager.h
index 7fb831b..7f941a4 100644
--- a/test/scenario/network/emulated_network_manager.h
+++ b/test/scenario/network/emulated_network_manager.h
@@ -32,7 +32,9 @@
                                public sigslot::has_slots<>,
                                public EmulatedNetworkManagerInterface {
  public:
-  EmulatedNetworkManager(Clock* clock, EndpointsContainer* endpoints_container);
+  EmulatedNetworkManager(Clock* clock,
+                         TaskQueueForTest* task_queue,
+                         EndpointsContainer* endpoints_container);
 
   void EnableEndpoint(EmulatedEndpoint* endpoint);
   void DisableEndpoint(EmulatedEndpoint* endpoint);
@@ -45,11 +47,14 @@
   // EmulatedNetworkManagerInterface API
   rtc::Thread* network_thread() override { return &network_thread_; }
   rtc::NetworkManager* network_manager() override { return this; }
+  void GetStats(
+      std::function<void(EmulatedNetworkStats)> stats_callback) const override;
 
  private:
   void UpdateNetworksOnce();
   void MaybeSignalNetworksChanged();
 
+  TaskQueueForTest* const task_queue_;
   EndpointsContainer* const endpoints_container_;
   FakeNetworkSocketServer socket_server_;
   rtc::Thread network_thread_;
diff --git a/test/scenario/network/network_emulation.cc b/test/scenario/network/network_emulation.cc
index 323e79c..194cf98 100644
--- a/test/scenario/network/network_emulation.cc
+++ b/test/scenario/network/network_emulation.cc
@@ -14,6 +14,7 @@
 #include <memory>
 
 #include "absl/memory/memory.h"
+#include "api/units/data_size.h"
 #include "rtc_base/bind.h"
 #include "rtc_base/logging.h"
 
@@ -198,7 +199,10 @@
                                   rtc::CopyOnWriteBuffer packet) {
   RTC_CHECK(from.ipaddr() == peer_local_addr_);
   struct Closure {
-    void operator()() { endpoint->router_.OnPacketReceived(std::move(packet)); }
+    void operator()() {
+      endpoint->UpdateSendStats(packet);
+      endpoint->router_.OnPacketReceived(std::move(packet));
+    }
     EmulatedEndpoint* endpoint;
     EmulatedIpPacket packet;
   };
@@ -258,18 +262,22 @@
 }
 
 void EmulatedEndpoint::OnPacketReceived(EmulatedIpPacket packet) {
+  RTC_DCHECK_RUN_ON(task_queue_);
   RTC_CHECK(packet.to.ipaddr() == peer_local_addr_)
       << "Routing error: wrong destination endpoint. Packet.to.ipaddr()=: "
       << packet.to.ipaddr().ToString()
       << "; Receiver peer_local_addr_=" << peer_local_addr_.ToString();
   rtc::CritScope crit(&receiver_lock_);
+  UpdateReceiveStats(packet);
   auto it = port_to_receiver_.find(packet.to.port());
   if (it == port_to_receiver_.end()) {
     // It can happen, that remote peer closed connection, but there still some
     // packets, that are going to it. It can happen during peer connection close
     // process: one peer closed connection, second still sending data.
-    RTC_LOG(INFO) << "No receiver registered in " << id_ << " on port "
-                  << packet.to.port();
+    RTC_LOG(INFO) << "Drop packet: no receiver registered in " << id_
+                  << " on port " << packet.to.port();
+    stats_.packets_dropped++;
+    stats_.bytes_dropped += DataSize::bytes(packet.size());
     return;
   }
   // Endpoint assumes frequent calls to bind and unbind methods, so it holds
@@ -295,6 +303,35 @@
   return is_enabled_;
 }
 
+EmulatedNetworkStats EmulatedEndpoint::stats() {
+  RTC_DCHECK_RUN_ON(task_queue_);
+  return stats_;
+}
+
+void EmulatedEndpoint::UpdateSendStats(const EmulatedIpPacket& packet) {
+  RTC_DCHECK_RUN_ON(task_queue_);
+  Timestamp current_time = Timestamp::us(clock_->TimeInMicroseconds());
+  if (stats_.first_packet_sent_time.IsInfinite()) {
+    stats_.first_packet_sent_time = current_time;
+    stats_.first_sent_packet_size = DataSize::bytes(packet.size());
+  }
+  stats_.last_packet_sent_time = current_time;
+  stats_.packets_sent++;
+  stats_.bytes_sent += DataSize::bytes(packet.size());
+}
+
+void EmulatedEndpoint::UpdateReceiveStats(const EmulatedIpPacket& packet) {
+  RTC_DCHECK_RUN_ON(task_queue_);
+  Timestamp current_time = Timestamp::us(clock_->TimeInMicroseconds());
+  if (stats_.first_packet_received_time.IsInfinite()) {
+    stats_.first_packet_received_time = current_time;
+    stats_.first_received_packet_size = DataSize::bytes(packet.size());
+  }
+  stats_.last_packet_received_time = current_time;
+  stats_.packets_received++;
+  stats_.bytes_received += DataSize::bytes(packet.size());
+}
+
 EndpointsContainer::EndpointsContainer(
     const std::vector<EmulatedEndpoint*>& endpoints)
     : endpoints_(endpoints) {}
@@ -331,4 +368,39 @@
   return networks;
 }
 
+EmulatedNetworkStats EndpointsContainer::GetStats() const {
+  EmulatedNetworkStats stats;
+  for (auto* endpoint : endpoints_) {
+    EmulatedNetworkStats endpoint_stats = endpoint->stats();
+    stats.packets_sent += endpoint_stats.packets_sent;
+    stats.bytes_sent += endpoint_stats.bytes_sent;
+    stats.packets_received += endpoint_stats.packets_received;
+    stats.bytes_received += endpoint_stats.bytes_received;
+    stats.packets_dropped += endpoint_stats.packets_dropped;
+    stats.bytes_dropped += endpoint_stats.bytes_dropped;
+    if (stats.first_packet_received_time >
+        endpoint_stats.first_packet_received_time) {
+      stats.first_packet_received_time =
+          endpoint_stats.first_packet_received_time;
+      stats.first_received_packet_size =
+          endpoint_stats.first_received_packet_size;
+    }
+    if (stats.first_packet_sent_time > endpoint_stats.first_packet_sent_time) {
+      stats.first_packet_sent_time = endpoint_stats.first_packet_sent_time;
+      stats.first_sent_packet_size = endpoint_stats.first_sent_packet_size;
+    }
+    if (stats.last_packet_received_time.IsInfinite() ||
+        stats.last_packet_received_time <
+            endpoint_stats.last_packet_received_time) {
+      stats.last_packet_received_time =
+          endpoint_stats.last_packet_received_time;
+    }
+    if (stats.last_packet_sent_time.IsInfinite() ||
+        stats.last_packet_sent_time < endpoint_stats.last_packet_sent_time) {
+      stats.last_packet_sent_time = endpoint_stats.last_packet_sent_time;
+    }
+  }
+  return stats;
+}
+
 }  // namespace webrtc
diff --git a/test/scenario/network/network_emulation.h b/test/scenario/network/network_emulation.h
index 84fd7ae..b3fe5eb 100644
--- a/test/scenario/network/network_emulation.h
+++ b/test/scenario/network/network_emulation.h
@@ -19,6 +19,7 @@
 #include <vector>
 
 #include "absl/types/optional.h"
+#include "api/test/network_emulation_manager.h"
 #include "api/test/simulated_network.h"
 #include "api/units/timestamp.h"
 #include "rtc_base/copy_on_write_buffer.h"
@@ -193,9 +194,13 @@
 
   const rtc::Network& network() const { return *network_.get(); }
 
+  EmulatedNetworkStats stats();
+
  private:
   static constexpr uint16_t kFirstEphemeralPort = 49152;
   uint16_t NextPort() RTC_EXCLUSIVE_LOCKS_REQUIRED(receiver_lock_);
+  void UpdateSendStats(const EmulatedIpPacket& packet);
+  void UpdateReceiveStats(const EmulatedIpPacket& packet);
 
   rtc::CriticalSection receiver_lock_;
   rtc::ThreadChecker enabled_state_checker_;
@@ -212,6 +217,8 @@
   uint16_t next_port_ RTC_GUARDED_BY(receiver_lock_);
   std::map<uint16_t, EmulatedNetworkReceiverInterface*> port_to_receiver_
       RTC_GUARDED_BY(receiver_lock_);
+
+  EmulatedNetworkStats stats_ RTC_GUARDED_BY(task_queue_);
 };
 
 class EmulatedRoute {
@@ -236,6 +243,7 @@
   // Returns list of networks for enabled endpoints. Caller takes ownership of
   // returned rtc::Network objects.
   std::vector<std::unique_ptr<rtc::Network>> GetEnabledNetworks() const;
+  EmulatedNetworkStats GetStats() const;
 
  private:
   const std::vector<EmulatedEndpoint*> endpoints_;
diff --git a/test/scenario/network/network_emulation_manager.cc b/test/scenario/network/network_emulation_manager.cc
index 1efc5b4..d04805f 100644
--- a/test/scenario/network/network_emulation_manager.cc
+++ b/test/scenario/network/network_emulation_manager.cc
@@ -200,7 +200,7 @@
     const std::vector<EmulatedEndpoint*>& endpoints) {
   auto endpoints_container = absl::make_unique<EndpointsContainer>(endpoints);
   auto network_manager = absl::make_unique<EmulatedNetworkManager>(
-      clock_, endpoints_container.get());
+      clock_, &task_queue_, endpoints_container.get());
   for (auto* endpoint : endpoints) {
     // Associate endpoint with network manager.
     bool insertion_result =
diff --git a/test/scenario/network/network_emulation_unittest.cc b/test/scenario/network/network_emulation_unittest.cc
index 2ed4e6b..6abd40d 100644
--- a/test/scenario/network/network_emulation_unittest.cc
+++ b/test/scenario/network/network_emulation_unittest.cc
@@ -28,6 +28,7 @@
 namespace {
 
 constexpr int kNetworkPacketWaitTimeoutMs = 100;
+constexpr int kStatsWaitTimeoutMs = 1000;
 
 class SocketReader : public sigslot::has_slots<> {
  public:
@@ -195,6 +196,7 @@
   EmulatedNetworkManagerInterface* nt2 =
       network_manager.CreateEmulatedNetworkManagerInterface({bob_endpoint});
 
+  rtc::CopyOnWriteBuffer data("Hello");
   for (uint64_t j = 0; j < 2; j++) {
     auto* s1 = nt1->network_thread()->socketserver()->CreateAsyncSocket(
         AF_INET, SOCK_DGRAM);
@@ -213,7 +215,6 @@
     s1->Connect(s2->GetLocalAddress());
     s2->Connect(s1->GetLocalAddress());
 
-    rtc::CopyOnWriteBuffer data("Hello");
     for (uint64_t i = 0; i < 1000; i++) {
       s1->Send(data.data(), data.size());
       s2->Send(data.data(), data.size());
@@ -221,12 +222,96 @@
 
     rtc::Event wait;
     wait.Wait(1000);
-    ASSERT_EQ(r1.ReceivedCount(), 1000);
-    ASSERT_EQ(r2.ReceivedCount(), 1000);
+    EXPECT_EQ(r1.ReceivedCount(), 1000);
+    EXPECT_EQ(r2.ReceivedCount(), 1000);
 
     delete s1;
     delete s2;
   }
+
+  int64_t single_packet_size = data.size();
+  std::atomic<int> received_stats_count{0};
+  nt1->GetStats([&](EmulatedNetworkStats st) {
+    EXPECT_EQ(st.packets_sent, 2000l);
+    EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 2000l);
+    EXPECT_EQ(st.packets_received, 2000l);
+    EXPECT_EQ(st.bytes_received.bytes(), single_packet_size * 2000l);
+    EXPECT_EQ(st.packets_dropped, 0l);
+    EXPECT_EQ(st.bytes_dropped.bytes(), 0l);
+    received_stats_count++;
+  });
+  nt2->GetStats([&](EmulatedNetworkStats st) {
+    EXPECT_EQ(st.packets_sent, 2000l);
+    EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 2000l);
+    EXPECT_EQ(st.packets_received, 2000l);
+    EXPECT_EQ(st.bytes_received.bytes(), single_packet_size * 2000l);
+    EXPECT_EQ(st.packets_dropped, 0l);
+    EXPECT_EQ(st.bytes_dropped.bytes(), 0l);
+    received_stats_count++;
+  });
+  ASSERT_EQ_WAIT(received_stats_count.load(), 2, kStatsWaitTimeoutMs);
+}
+
+TEST(NetworkEmulationManagerTest, ThoughputStats) {
+  NetworkEmulationManagerImpl network_manager;
+
+  EmulatedNetworkNode* alice_node = network_manager.CreateEmulatedNode(
+      absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
+  EmulatedNetworkNode* bob_node = network_manager.CreateEmulatedNode(
+      absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
+  EmulatedEndpoint* alice_endpoint =
+      network_manager.CreateEndpoint(EmulatedEndpointConfig());
+  EmulatedEndpoint* bob_endpoint =
+      network_manager.CreateEndpoint(EmulatedEndpointConfig());
+  network_manager.CreateRoute(alice_endpoint, {alice_node}, bob_endpoint);
+  network_manager.CreateRoute(bob_endpoint, {bob_node}, alice_endpoint);
+
+  EmulatedNetworkManagerInterface* nt1 =
+      network_manager.CreateEmulatedNetworkManagerInterface({alice_endpoint});
+  EmulatedNetworkManagerInterface* nt2 =
+      network_manager.CreateEmulatedNetworkManagerInterface({bob_endpoint});
+
+  int64_t single_packet_size = 100;
+  rtc::CopyOnWriteBuffer data(single_packet_size);
+  auto* s1 = nt1->network_thread()->socketserver()->CreateAsyncSocket(
+      AF_INET, SOCK_DGRAM);
+  auto* s2 = nt2->network_thread()->socketserver()->CreateAsyncSocket(
+      AF_INET, SOCK_DGRAM);
+
+  SocketReader r1(s1);
+  SocketReader r2(s2);
+
+  rtc::SocketAddress a1(alice_endpoint->GetPeerLocalAddress(), 0);
+  rtc::SocketAddress a2(bob_endpoint->GetPeerLocalAddress(), 0);
+
+  s1->Bind(a1);
+  s2->Bind(a2);
+
+  s1->Connect(s2->GetLocalAddress());
+  s2->Connect(s1->GetLocalAddress());
+
+  // Send 10 packets for 1
+  rtc::Event wait;
+  for (uint64_t i = 0; i < 11; i++) {
+    s1->Send(data.data(), data.size());
+    s2->Send(data.data(), data.size());
+    wait.Wait(100);
+  }
+  EXPECT_EQ(r1.ReceivedCount(), 11);
+  EXPECT_EQ(r2.ReceivedCount(), 11);
+
+  delete s1;
+  delete s2;
+
+  std::atomic<int> received_stats_count{0};
+  nt1->GetStats([&](EmulatedNetworkStats st) {
+    EXPECT_EQ(st.packets_sent, 11l);
+    EXPECT_EQ(st.bytes_sent.bytes(), single_packet_size * 11l);
+    EXPECT_NEAR(st.AverageSendRate().bps(), DataRate::bytes_per_sec(1000).bps(),
+                1000);
+    received_stats_count++;
+  });
+  ASSERT_EQ_WAIT(received_stats_count.load(), 1, kStatsWaitTimeoutMs);
 }
 
 // Testing that packets are delivered via all routes using a routing scheme as