UDP socket and TCP socket use AsyncPacketSocket::NotifyPacketReceived

This is done instead of directly using AsyncPacketSocket::SignalReceived.


Bug: webrtc:15368, webrtc:11943
Change-Id: I5671e66b270355188b1252138eced8e6c78ba7ad
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/327521
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41180}
diff --git a/p2p/base/stun_server_unittest.cc b/p2p/base/stun_server_unittest.cc
index 5d3f31f..e4ea30c 100644
--- a/p2p/base/stun_server_unittest.cc
+++ b/p2p/base/stun_server_unittest.cc
@@ -33,15 +33,13 @@
 
 class StunServerTest : public ::testing::Test {
  public:
-  StunServerTest() : ss_(new rtc::VirtualSocketServer()), network_(ss_.get()) {
+  StunServerTest() : ss_(new rtc::VirtualSocketServer()) {
+    ss_->SetMessageQueue(&main_thread);
     server_.reset(
         new StunServer(rtc::AsyncUDPSocket::Create(ss_.get(), server_addr)));
     client_.reset(new rtc::TestClient(
         absl::WrapUnique(rtc::AsyncUDPSocket::Create(ss_.get(), client_addr))));
-
-    network_.Start();
   }
-  ~StunServerTest() override { network_.Stop(); }
 
   void Send(const StunMessage& msg) {
     rtc::ByteBufferWriter buf;
@@ -57,7 +55,7 @@
     std::unique_ptr<rtc::TestClient::Packet> packet =
         client_->NextPacket(rtc::TestClient::kTimeoutMs);
     if (packet) {
-      rtc::ByteBufferReader buf(packet->buf, packet->size);
+      rtc::ByteBufferReader buf(packet->buf);
       msg = new StunMessage();
       msg->Read(&buf);
     }
@@ -67,7 +65,6 @@
  private:
   rtc::AutoThread main_thread;
   std::unique_ptr<rtc::VirtualSocketServer> ss_;
-  rtc::Thread network_;
   std::unique_ptr<StunServer> server_;
   std::unique_ptr<rtc::TestClient> client_;
 };
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 899a689..ca1a457 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -1661,10 +1661,13 @@
   ]
   deps = [
     ":async_udp_socket",
+    ":buffer",
     ":gunit_helpers",
     ":rtc_base_tests_utils",
     ":threading",
     ":timeutils",
+    "../api/units:timestamp",
+    "network:received_packet",
     "synchronization:mutex",
   ]
 }
diff --git a/rtc_base/async_tcp_socket.cc b/rtc_base/async_tcp_socket.cc
index 367c5b0..eed4a31 100644
--- a/rtc_base/async_tcp_socket.cc
+++ b/rtc_base/async_tcp_socket.cc
@@ -294,8 +294,8 @@
     if (*len < kPacketLenSize + pkt_len)
       return;
 
-    SignalReadPacket(this, data + kPacketLenSize, pkt_len, remote_addr,
-                     TimeMicros());
+    NotifyPacketReceived(rtc::ReceivedPacket::CreateFromLegacy(
+        data + kPacketLenSize, pkt_len, rtc::TimeMicros(), remote_addr));
 
     *len -= kPacketLenSize + pkt_len;
     if (*len > 0) {
diff --git a/rtc_base/async_tcp_socket.h b/rtc_base/async_tcp_socket.h
index 541080f..90f77d6 100644
--- a/rtc_base/async_tcp_socket.h
+++ b/rtc_base/async_tcp_socket.h
@@ -13,6 +13,7 @@
 
 #include <stddef.h>
 
+#include <cstdint>
 #include <memory>
 
 #include "rtc_base/async_packet_socket.h"
diff --git a/rtc_base/async_udp_socket.cc b/rtc_base/async_udp_socket.cc
index af7ae56..358420a 100644
--- a/rtc_base/async_udp_socket.cc
+++ b/rtc_base/async_udp_socket.cc
@@ -136,8 +136,8 @@
 
   // TODO: Make sure that we got all of the packet.
   // If we did not, then we should resize our buffer to be large enough.
-  SignalReadPacket(this, buf_, static_cast<size_t>(len), remote_addr,
-                   timestamp);
+  NotifyPacketReceived(
+      rtc::ReceivedPacket::CreateFromLegacy(buf_, len, timestamp, remote_addr));
 }
 
 void AsyncUDPSocket::OnWriteEvent(Socket* socket) {
diff --git a/rtc_base/nat_unittest.cc b/rtc_base/nat_unittest.cc
index 19e5354..432985d 100644
--- a/rtc_base/nat_unittest.cc
+++ b/rtc_base/nat_unittest.cc
@@ -11,14 +11,17 @@
 #include <string.h>
 
 #include <algorithm>
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <vector>
 
 #include "absl/memory/memory.h"
+#include "api/units/time_delta.h"
 #include "rtc_base/async_packet_socket.h"
 #include "rtc_base/async_tcp_socket.h"
 #include "rtc_base/async_udp_socket.h"
+#include "rtc_base/event.h"
 #include "rtc_base/gunit.h"
 #include "rtc_base/ip_address.h"
 #include "rtc_base/logging.h"
@@ -80,29 +83,36 @@
   NATSocketFactory* natsf = new NATSocketFactory(
       internal, nat->internal_udp_address(), nat->internal_tcp_address());
 
-  TestClient* in = CreateTestClient(natsf, internal_addr);
-  TestClient* out[4];
-  for (int i = 0; i < 4; i++)
-    out[i] = CreateTestClient(external, external_addrs[i]);
-
   th_int.Start();
   th_ext.Start();
 
+  TestClient* in;
+  th_int.BlockingCall([&] { in = CreateTestClient(natsf, internal_addr); });
+
+  TestClient* out[4];
+  th_ext.BlockingCall([&] {
+    for (int i = 0; i < 4; i++)
+      out[i] = CreateTestClient(external, external_addrs[i]);
+  });
+
   const char* buf = "filter_test";
   size_t len = strlen(buf);
 
-  in->SendTo(buf, len, out[0]->address());
+  th_int.BlockingCall([&] { in->SendTo(buf, len, out[0]->address()); });
   SocketAddress trans_addr;
-  EXPECT_TRUE(out[0]->CheckNextPacket(buf, len, &trans_addr));
+  th_ext.BlockingCall(
+      [&] { EXPECT_TRUE(out[0]->CheckNextPacket(buf, len, &trans_addr)); });
 
   for (int i = 1; i < 4; i++) {
-    in->SendTo(buf, len, out[i]->address());
+    th_int.BlockingCall([&] { in->SendTo(buf, len, out[i]->address()); });
     SocketAddress trans_addr2;
-    EXPECT_TRUE(out[i]->CheckNextPacket(buf, len, &trans_addr2));
-    bool are_same = (trans_addr == trans_addr2);
-    ASSERT_EQ(are_same, exp_same) << "same translated address";
-    ASSERT_NE(AF_UNSPEC, trans_addr.family());
-    ASSERT_NE(AF_UNSPEC, trans_addr2.family());
+    th_ext.BlockingCall([&] {
+      EXPECT_TRUE(out[i]->CheckNextPacket(buf, len, &trans_addr2));
+      bool are_same = (trans_addr == trans_addr2);
+      ASSERT_EQ(are_same, exp_same) << "same translated address";
+      ASSERT_NE(AF_UNSPEC, trans_addr.family());
+      ASSERT_NE(AF_UNSPEC, trans_addr2.family());
+    });
   }
 
   th_int.Stop();
@@ -134,29 +144,39 @@
   NATSocketFactory* natsf = new NATSocketFactory(
       internal, nat->internal_udp_address(), nat->internal_tcp_address());
 
-  TestClient* in = CreateTestClient(natsf, internal_addr);
-  TestClient* out[4];
-  for (int i = 0; i < 4; i++)
-    out[i] = CreateTestClient(external, external_addrs[i]);
-
   th_int.Start();
   th_ext.Start();
 
+  TestClient* in = nullptr;
+  th_int.BlockingCall([&] { in = CreateTestClient(natsf, internal_addr); });
+
+  TestClient* out[4];
+  th_ext.BlockingCall([&] {
+    for (int i = 0; i < 4; i++)
+      out[i] = CreateTestClient(external, external_addrs[i]);
+  });
+
   const char* buf = "filter_test";
   size_t len = strlen(buf);
 
-  in->SendTo(buf, len, out[0]->address());
+  th_int.BlockingCall([&] { in->SendTo(buf, len, out[0]->address()); });
   SocketAddress trans_addr;
-  EXPECT_TRUE(out[0]->CheckNextPacket(buf, len, &trans_addr));
+  th_ext.BlockingCall(
+      [&] { EXPECT_TRUE(out[0]->CheckNextPacket(buf, len, &trans_addr)); });
 
-  out[1]->SendTo(buf, len, trans_addr);
-  EXPECT_TRUE(CheckReceive(in, !filter_ip, buf, len));
+  th_ext.BlockingCall([&] { out[1]->SendTo(buf, len, trans_addr); });
+  th_int.BlockingCall(
+      [&] { EXPECT_TRUE(CheckReceive(in, !filter_ip, buf, len)); });
+  th_ext.BlockingCall([&] { out[2]->SendTo(buf, len, trans_addr); });
 
-  out[2]->SendTo(buf, len, trans_addr);
-  EXPECT_TRUE(CheckReceive(in, !filter_port, buf, len));
+  th_int.BlockingCall(
+      [&] { EXPECT_TRUE(CheckReceive(in, !filter_port, buf, len)); });
 
-  out[3]->SendTo(buf, len, trans_addr);
-  EXPECT_TRUE(CheckReceive(in, !filter_ip && !filter_port, buf, len));
+  th_ext.BlockingCall([&] { out[3]->SendTo(buf, len, trans_addr); });
+
+  th_int.BlockingCall([&] {
+    EXPECT_TRUE(CheckReceive(in, !filter_ip && !filter_port, buf, len));
+  });
 
   th_int.Stop();
   th_ext.Stop();
diff --git a/rtc_base/socket_unittest.cc b/rtc_base/socket_unittest.cc
index 0a41a77..f5ef2a3 100644
--- a/rtc_base/socket_unittest.cc
+++ b/rtc_base/socket_unittest.cc
@@ -1132,13 +1132,13 @@
   client2->SendTo("foo", 3, address);
   std::unique_ptr<TestClient::Packet> packet_1 = client1->NextPacket(10000);
   ASSERT_TRUE(packet_1 != nullptr);
-  EXPECT_NEAR(packet_1->packet_time_us, rtc::TimeMicros(), 1000'000);
+  EXPECT_NEAR(packet_1->packet_time->us(), rtc::TimeMicros(), 1000'000);
 
   Thread::SleepMs(100);
   client2->SendTo("bar", 3, address);
   std::unique_ptr<TestClient::Packet> packet_2 = client1->NextPacket(10000);
   ASSERT_TRUE(packet_2 != nullptr);
-  EXPECT_GT(packet_2->packet_time_us, packet_1->packet_time_us);
-  EXPECT_NEAR(packet_2->packet_time_us, rtc::TimeMicros(), 1000'000);
+  EXPECT_GT(packet_2->packet_time->us(), packet_1->packet_time->us());
+  EXPECT_NEAR(packet_2->packet_time->us(), rtc::TimeMicros(), 1000'000);
 }
 }  // namespace rtc
diff --git a/rtc_base/test_client.cc b/rtc_base/test_client.cc
index f23ac2a..87c9465 100644
--- a/rtc_base/test_client.cc
+++ b/rtc_base/test_client.cc
@@ -15,7 +15,9 @@
 #include <memory>
 #include <utility>
 
+#include "api/units/timestamp.h"
 #include "rtc_base/gunit.h"
+#include "rtc_base/network/received_packet.h"
 #include "rtc_base/thread.h"
 #include "rtc_base/time_utils.h"
 
@@ -30,10 +32,11 @@
 
 TestClient::TestClient(std::unique_ptr<AsyncPacketSocket> socket,
                        ThreadProcessingFakeClock* fake_clock)
-    : fake_clock_(fake_clock),
-      socket_(std::move(socket)),
-      prev_packet_timestamp_(-1) {
-  socket_->SignalReadPacket.connect(this, &TestClient::OnPacket);
+    : fake_clock_(fake_clock), socket_(std::move(socket)) {
+  socket_->RegisterReceivedPacketCallback(
+      [&](rtc::AsyncPacketSocket* socket, const rtc::ReceivedPacket& packet) {
+        OnPacket(socket, packet);
+      });
   socket_->SignalReadyToSend.connect(this, &TestClient::OnReadyToSend);
 }
 
@@ -100,20 +103,22 @@
   bool res = false;
   std::unique_ptr<Packet> packet = NextPacket(kTimeoutMs);
   if (packet) {
-    res = (packet->size == size && memcmp(packet->buf, buf, size) == 0 &&
-           CheckTimestamp(packet->packet_time_us));
+    res = (packet->buf.size() == size &&
+           memcmp(packet->buf.data(), buf, size) == 0 &&
+           CheckTimestamp(packet->packet_time));
     if (addr)
       *addr = packet->addr;
   }
   return res;
 }
 
-bool TestClient::CheckTimestamp(int64_t packet_timestamp) {
+bool TestClient::CheckTimestamp(
+    absl::optional<webrtc::Timestamp> packet_timestamp) {
   bool res = true;
-  if (packet_timestamp == -1) {
+  if (!packet_timestamp) {
     res = false;
   }
-  if (prev_packet_timestamp_ != -1) {
+  if (prev_packet_timestamp_) {
     if (packet_timestamp < prev_packet_timestamp_) {
       res = false;
     }
@@ -145,36 +150,24 @@
 }
 
 void TestClient::OnPacket(AsyncPacketSocket* socket,
-                          const char* buf,
-                          size_t size,
-                          const SocketAddress& remote_addr,
-                          const int64_t& packet_time_us) {
+                          const rtc::ReceivedPacket& received_packet) {
   webrtc::MutexLock lock(&mutex_);
-  packets_.push_back(
-      std::make_unique<Packet>(remote_addr, buf, size, packet_time_us));
+  packets_.push_back(std::make_unique<Packet>(received_packet));
 }
 
 void TestClient::OnReadyToSend(AsyncPacketSocket* socket) {
   ++ready_to_send_count_;
 }
 
-TestClient::Packet::Packet(const SocketAddress& a,
-                           const char* b,
-                           size_t s,
-                           int64_t packet_time_us)
-    : addr(a), buf(0), size(s), packet_time_us(packet_time_us) {
-  buf = new char[size];
-  memcpy(buf, b, size);
-}
+TestClient::Packet::Packet(const rtc::ReceivedPacket& received_packet)
+    : addr(received_packet.source_address()),
+      // Copy received_packet payload to a buffer owned by Packet.
+      buf(received_packet.payload().data(), received_packet.payload().size()),
+      packet_time(received_packet.arrival_time()) {}
 
 TestClient::Packet::Packet(const Packet& p)
-    : addr(p.addr), buf(0), size(p.size), packet_time_us(p.packet_time_us) {
-  buf = new char[size];
-  memcpy(buf, p.buf, size);
-}
-
-TestClient::Packet::~Packet() {
-  delete[] buf;
-}
+    : addr(p.addr),
+      buf(p.buf.data(), p.buf.size()),
+      packet_time(p.packet_time) {}
 
 }  // namespace rtc
diff --git a/rtc_base/test_client.h b/rtc_base/test_client.h
index dd91d37..6fe6fd5 100644
--- a/rtc_base/test_client.h
+++ b/rtc_base/test_client.h
@@ -14,8 +14,11 @@
 #include <memory>
 #include <vector>
 
+#include "api/units/timestamp.h"
 #include "rtc_base/async_udp_socket.h"
+#include "rtc_base/buffer.h"
 #include "rtc_base/fake_clock.h"
+#include "rtc_base/network/received_packet.h"
 #include "rtc_base/synchronization/mutex.h"
 
 namespace rtc {
@@ -26,17 +29,12 @@
  public:
   // Records the contents of a packet that was received.
   struct Packet {
-    Packet(const SocketAddress& a,
-           const char* b,
-           size_t s,
-           int64_t packet_time_us);
+    Packet(const rtc::ReceivedPacket& received_packet);
     Packet(const Packet& p);
-    virtual ~Packet();
 
     SocketAddress addr;
-    char* buf;
-    size_t size;
-    int64_t packet_time_us;
+    Buffer buf;
+    absl::optional<webrtc::Timestamp> packet_time;
   };
 
   // Default timeout for NextPacket reads.
@@ -96,14 +94,11 @@
   static const int kNoPacketTimeoutMs = 1000;
   // Workaround for the fact that AsyncPacketSocket::GetConnState doesn't exist.
   Socket::ConnState GetState();
-  // Slot for packets read on the socket.
+
   void OnPacket(AsyncPacketSocket* socket,
-                const char* buf,
-                size_t len,
-                const SocketAddress& remote_addr,
-                const int64_t& packet_time_us);
+                const rtc::ReceivedPacket& received_packet);
   void OnReadyToSend(AsyncPacketSocket* socket);
-  bool CheckTimestamp(int64_t packet_timestamp);
+  bool CheckTimestamp(absl::optional<webrtc::Timestamp> packet_timestamp);
   void AdvanceTime(int ms);
 
   ThreadProcessingFakeClock* fake_clock_ = nullptr;
@@ -111,7 +106,7 @@
   std::unique_ptr<AsyncPacketSocket> socket_;
   std::vector<std::unique_ptr<Packet>> packets_;
   int ready_to_send_count_ = 0;
-  int64_t prev_packet_timestamp_;
+  absl::optional<webrtc::Timestamp> prev_packet_timestamp_;
 };
 
 }  // namespace rtc