AsyncPacketSocket::RegisterReceivedPacketCallback

This cl introduce RegisterReceivedPacketCallback and
DeregisterReceivedPacketCallback that will be used to replace AsyncPacketSocket::SignalReadPacket

A "proof of concept" cl is here: https://webrtc-review.googlesource.com/c/src/+/327324

Bug: webrtc:15368
Change-Id: I07e4f564dc8420d78e542991689778d8531225df
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/327325
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41164}
diff --git a/BUILD.gn b/BUILD.gn
index 34c0c30..baf8b09 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -608,6 +608,7 @@
       "p2p:libstunprober_unittests",
       "p2p:rtc_p2p_unittests",
       "rtc_base:async_dns_resolver_unittests",
+      "rtc_base:async_packet_socket_unittest",
       "rtc_base:callback_list_unittests",
       "rtc_base:rtc_base_approved_unittests",
       "rtc_base:rtc_base_unittests",
diff --git a/p2p/base/port_unittest.cc b/p2p/base/port_unittest.cc
index 1d482c4..f3a01d0 100644
--- a/p2p/base/port_unittest.cc
+++ b/p2p/base/port_unittest.cc
@@ -266,14 +266,14 @@
   lconn->Ping(rtc::TimeMillis());
   ASSERT_TRUE_WAIT(lport->last_stun_msg(), kDefaultTimeout);
   ASSERT_GT(lport->last_stun_buf().size(), 0u);
-  rconn->OnReadPacket(
-      rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt));
+  rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   clock->AdvanceTime(webrtc::TimeDelta::Millis(ms));
   ASSERT_TRUE_WAIT(rport->last_stun_msg(), kDefaultTimeout);
   ASSERT_GT(rport->last_stun_buf().size(), 0u);
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 }
 
 class TestChannel : public sigslot::has_slots<> {
@@ -1494,8 +1494,8 @@
   ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
   IceMessage* msg = lport->last_stun_msg();
   EXPECT_EQ(STUN_BINDING_REQUEST, msg->type());
-  conn->OnReadPacket(
-      rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt));
+  conn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
+                                         rtc::SocketAddress(), absl::nullopt));
   ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
   msg = lport->last_stun_msg();
   EXPECT_EQ(STUN_BINDING_RESPONSE, msg->type());
@@ -1528,10 +1528,8 @@
   lport->Reset();
   auto buf = std::make_unique<ByteBufferWriter>();
   WriteStunMessage(*modified_req, buf.get());
-  conn1->OnReadPacket(
-      rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>(
-                              rtc::MakeArrayView(buf->Data(), buf->Length())),
-                          absl::nullopt));
+  conn1->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
+      buf->Data(), buf->Length(), /*packet_time_us=*/-1));
   ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
   msg = lport->last_stun_msg();
   EXPECT_EQ(STUN_BINDING_ERROR_RESPONSE, msg->type());
@@ -1564,8 +1562,8 @@
   IceMessage* msg = rport->last_stun_msg();
   EXPECT_EQ(STUN_BINDING_REQUEST, msg->type());
   // Send rport binding request to lport.
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
   EXPECT_EQ(STUN_BINDING_RESPONSE, lport->last_stun_msg()->type());
@@ -1897,14 +1895,14 @@
   std::unique_ptr<IceMessage> request = CopyStunMessage(*msg);
 
   // Receive the BINDING-REQUEST and respond with BINDING-RESPONSE.
-  rconn->OnReadPacket(
-      rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt));
+  rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
   msg = rport->last_stun_msg();
   ASSERT_TRUE(msg != NULL);
   EXPECT_EQ(STUN_BINDING_RESPONSE, msg->type());
   // Received a BINDING-RESPONSE.
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   // Verify the STUN Stats.
   EXPECT_EQ(1U, lconn->stats().sent_ping_requests_total);
@@ -1984,12 +1982,12 @@
 
   // Respond with a BINDING-RESPONSE.
   request = CopyStunMessage(*msg);
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
   msg = lport->last_stun_msg();
   // Receive the BINDING-RESPONSE.
-  rconn->OnReadPacket(
-      rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt));
+  rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   // Verify the Stun ping stats.
   EXPECT_EQ(3U, rconn->stats().sent_ping_requests_total);
@@ -2040,8 +2038,8 @@
   lconn->Ping(0);
   ASSERT_TRUE_WAIT(lport->last_stun_msg(), kDefaultTimeout);
   ASSERT_GT(lport->last_stun_buf().size(), 0u);
-  rconn->OnReadPacket(
-      rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt));
+  rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   EXPECT_EQ(nomination, rconn->remote_nomination());
   EXPECT_FALSE(lconn->nominated());
@@ -2053,8 +2051,8 @@
   // updating the acknowledged nomination of `lconn`.
   ASSERT_TRUE_WAIT(rport->last_stun_msg(), kDefaultTimeout);
   ASSERT_GT(rport->last_stun_buf().size(), 0u);
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   EXPECT_EQ(nomination, lconn->acked_nomination());
   EXPECT_TRUE(lconn->nominated());
@@ -2181,8 +2179,8 @@
   IceMessage* msg = lport->last_stun_msg();
   EXPECT_EQ(STUN_BINDING_REQUEST, msg->type());
   // Pass the binding request to rport.
-  rconn->OnReadPacket(
-      rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt));
+  rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   // Wait until rport sends the response and then check the remote network cost.
   ASSERT_TRUE_WAIT(rport->last_stun_msg() != NULL, kDefaultTimeout);
@@ -2512,8 +2510,8 @@
   // Send request.
   lconn->Ping(0);
   ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
-  rconn->OnReadPacket(
-      rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt));
+  rconn->OnReadPacket(rtc::ReceivedPacket(lport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   // Intercept request and add comprehension required attribute.
   ASSERT_TRUE_WAIT(rport->last_stun_msg() != NULL, kDefaultTimeout);
@@ -2523,10 +2521,8 @@
   modified_response->AddFingerprint();
   ByteBufferWriter buf;
   WriteStunMessage(*modified_response, &buf);
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>(
-                              rtc::MakeArrayView(buf.Data(), buf.Length())),
-                          absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
+      buf.Data(), buf.Length(), /*packet_time_us=*/-1));
   // Response should have been ignored, leaving us unwritable still.
   EXPECT_FALSE(lconn->writable());
 }
@@ -2554,10 +2550,8 @@
   in_msg->AddFingerprint();
   ByteBufferWriter buf;
   WriteStunMessage(*in_msg, &buf);
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>(
-                              rtc::MakeArrayView(buf.Data(), buf.Length())),
-                          absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
+      buf.Data(), buf.Length(), /*packet_time_us=*/-1));
   EXPECT_EQ(0u, lconn->last_ping_received());
 }
 
@@ -2603,8 +2597,8 @@
   IceMessage* msg = rport->last_stun_msg();
   EXPECT_EQ(STUN_BINDING_REQUEST, msg->type());
   // Send rport binding request to lport.
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rport->last_stun_buf(), absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket(rport->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   ASSERT_TRUE_WAIT(lport->last_stun_msg() != NULL, kDefaultTimeout);
   EXPECT_EQ(STUN_BINDING_RESPONSE, lport->last_stun_msg()->type());
@@ -2613,10 +2607,8 @@
   // Adding a delay of 100ms.
   rtc::Thread::Current()->ProcessMessages(100);
   // Pinging lconn using stun indication message.
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>(
-                              rtc::MakeArrayView(buf->Data(), buf->Length())),
-                          absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
+      buf->Data(), buf->Length(), /*packet_time_us=*/-1));
   int64_t last_ping_received2 = lconn->last_ping_received();
   EXPECT_GT(last_ping_received2, last_ping_received1);
 }
@@ -3113,8 +3105,8 @@
   con->SendStunBindingResponse(request.get());
 
   // Feeding the respone message from litemode to the full mode connection.
-  ch1.conn()->OnReadPacket(
-      rtc::ReceivedPacket(ice_lite_port->last_stun_buf(), absl::nullopt));
+  ch1.conn()->OnReadPacket(rtc::ReceivedPacket(
+      ice_lite_port->last_stun_buf(), rtc::SocketAddress(), absl::nullopt));
 
   // Verifying full mode connection becomes writable from the response.
   EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, ch1.conn()->write_state(),
@@ -3231,8 +3223,8 @@
                 GetSupportedGoogPingVersion(response) >= kGoogPingVersion);
 
   // Feeding the respone message back.
-  ch1.conn()->OnReadPacket(
-      rtc::ReceivedPacket(port2->last_stun_buf(), absl::nullopt));
+  ch1.conn()->OnReadPacket(rtc::ReceivedPacket(
+      port2->last_stun_buf(), rtc::SocketAddress(), absl::nullopt));
 
   port1->Reset();
   port2->Reset();
@@ -3415,10 +3407,8 @@
   modified_response->Write(&buf);
 
   // Feeding the modified respone message back.
-  ch1.conn()->OnReadPacket(
-      rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>(
-                              rtc::MakeArrayView(buf.Data(), buf.Length())),
-                          absl::nullopt));
+  ch1.conn()->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
+      buf.Data(), buf.Length(), /*packet_time_us=*/-1));
 
   port1->Reset();
   port2->Reset();
@@ -3490,8 +3480,8 @@
   ASSERT_TRUE(GetSupportedGoogPingVersion(response) >= kGoogPingVersion);
 
   // Feeding the respone message back.
-  ch1.conn()->OnReadPacket(
-      rtc::ReceivedPacket(port2->last_stun_buf(), absl::nullopt));
+  ch1.conn()->OnReadPacket(rtc::ReceivedPacket(
+      port2->last_stun_buf(), rtc::SocketAddress(), absl::nullopt));
 
   port1->Reset();
   port2->Reset();
@@ -3575,8 +3565,8 @@
   ASSERT_TRUE(GetSupportedGoogPingVersion(response) >= kGoogPingVersion);
 
   // Feeding the respone message back.
-  ch1.conn()->OnReadPacket(
-      rtc::ReceivedPacket(port2->last_stun_buf(), absl::nullopt));
+  ch1.conn()->OnReadPacket(rtc::ReceivedPacket(
+      port2->last_stun_buf(), rtc::SocketAddress(), absl::nullopt));
 
   port1->Reset();
   port2->Reset();
@@ -3602,10 +3592,8 @@
   rtc::ByteBufferWriter buf;
   error_response.Write(&buf);
 
-  ch1.conn()->OnReadPacket(
-      rtc::ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>(
-                              rtc::MakeArrayView(buf.Data(), buf.Length())),
-                          absl::nullopt));
+  ch1.conn()->OnReadPacket(rtc::ReceivedPacket::CreateFromLegacy(
+      buf.Data(), buf.Length(), /*packet_time_us=*/-1));
 
   // And now the third ping...this should be a binding.
   port1->Reset();
@@ -3842,8 +3830,8 @@
     lconn->Ping(rtc::TimeMillis());
     ASSERT_TRUE_WAIT(lport->last_stun_msg(), kDefaultTimeout);
     ASSERT_GT(lport->last_stun_buf().size(), 0u);
-    rconn->OnReadPacket(
-        rtc::ReceivedPacket(lport->last_stun_buf(), absl::nullopt));
+    rconn->OnReadPacket(rtc::ReceivedPacket(
+        lport->last_stun_buf(), rtc::SocketAddress(), absl::nullopt));
 
     clock_.AdvanceTime(webrtc::TimeDelta::Millis(ms));
     ASSERT_TRUE_WAIT(rport->last_stun_msg(), kDefaultTimeout);
@@ -3857,7 +3845,8 @@
     rtc::BufferT<uint8_t> reply;
     SendPingAndCaptureReply(lconn, rconn, ms, &reply);
 
-    lconn->OnReadPacket(rtc::ReceivedPacket(reply, absl::nullopt));
+    lconn->OnReadPacket(
+        rtc::ReceivedPacket(reply, rtc::SocketAddress(), absl::nullopt));
   }
 
   void OnConnectionStateChange(Connection* connection) { num_state_changes_++; }
@@ -3918,7 +3907,8 @@
   EXPECT_FALSE(lconn->writable());
   EXPECT_FALSE(lconn->receiving());
 
-  lconn->OnReadPacket(rtc::ReceivedPacket(reply, absl::nullopt));
+  lconn->OnReadPacket(
+      rtc::ReceivedPacket(reply, rtc::SocketAddress(), absl::nullopt));
 
   // That reply was discarded due to the ForgetLearnedState() while it was
   // outstanding.
@@ -3990,15 +3980,15 @@
   lconn->Ping(rtc::TimeMillis(), std::move(delta));
   ASSERT_TRUE_WAIT(lport_->last_stun_msg(), kDefaultTimeout);
   ASSERT_GT(lport_->last_stun_buf().size(), 0u);
-  rconn->OnReadPacket(
-      rtc::ReceivedPacket(lport_->last_stun_buf(), absl::nullopt));
+  rconn->OnReadPacket(rtc::ReceivedPacket(lport_->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
   EXPECT_TRUE(received_goog_delta);
 
   clock_.AdvanceTime(webrtc::TimeDelta::Millis(ms));
   ASSERT_TRUE_WAIT(rport_->last_stun_msg(), kDefaultTimeout);
   ASSERT_GT(rport_->last_stun_buf().size(), 0u);
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rport_->last_stun_buf(), absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket(rport_->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   EXPECT_TRUE(received_goog_delta_ack);
 }
@@ -4028,14 +4018,14 @@
   lconn->Ping(rtc::TimeMillis(), std::move(delta));
   ASSERT_TRUE_WAIT(lport_->last_stun_msg(), kDefaultTimeout);
   ASSERT_GT(lport_->last_stun_buf().size(), 0u);
-  rconn->OnReadPacket(
-      rtc::ReceivedPacket(lport_->last_stun_buf(), absl::nullopt));
+  rconn->OnReadPacket(rtc::ReceivedPacket(lport_->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
 
   clock_.AdvanceTime(webrtc::TimeDelta::Millis(ms));
   ASSERT_TRUE_WAIT(rport_->last_stun_msg(), kDefaultTimeout);
   ASSERT_GT(rport_->last_stun_buf().size(), 0u);
-  lconn->OnReadPacket(
-      rtc::ReceivedPacket(rport_->last_stun_buf(), absl::nullopt));
+  lconn->OnReadPacket(rtc::ReceivedPacket(rport_->last_stun_buf(),
+                                          rtc::SocketAddress(), absl::nullopt));
   EXPECT_TRUE(received_goog_delta_ack_error);
 }
 
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index ac237ed..9cf0522 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -1369,10 +1369,12 @@
   ]
   deps = [
     ":callback_list",
+    ":checks",
     ":dscp",
     ":socket",
     ":timeutils",
     "../api:sequence_checker",
+    "network:received_packet",
     "network:sent_packet",
     "system:no_unique_address",
     "system:rtc_export",
@@ -1380,6 +1382,21 @@
   ]
 }
 
+if (rtc_include_tests) {
+  rtc_library("async_packet_socket_unittest") {
+    testonly = true
+    visibility = [ "*" ]
+    sources = [ "async_packet_socket_unittest.cc" ]
+    deps = [
+      ":async_packet_socket",
+      ":gunit_helpers",
+      "../test:test_support",
+      "network:received_packet",
+      "third_party/sigslot",
+    ]
+  }
+}
+
 rtc_library("mdns_responder_interface") {
   sources = [ "mdns_responder_interface.h" ]
   deps = [ ":ip_address" ]
diff --git a/rtc_base/async_packet_socket.cc b/rtc_base/async_packet_socket.cc
index f50138c..3721366 100644
--- a/rtc_base/async_packet_socket.cc
+++ b/rtc_base/async_packet_socket.cc
@@ -10,6 +10,8 @@
 
 #include "rtc_base/async_packet_socket.h"
 
+#include "rtc_base/checks.h"
+
 namespace rtc {
 
 PacketTimeUpdateParams::PacketTimeUpdateParams() = default;
@@ -38,6 +40,41 @@
   on_close_.RemoveReceivers(removal_tag);
 }
 
+void AsyncPacketSocket::RegisterReceivedPacketCallback(
+    absl::AnyInvocable<void(AsyncPacketSocket*, const rtc::ReceivedPacket&)>
+        received_packet_callback) {
+  RTC_DCHECK_RUN_ON(&network_checker_);
+  RTC_CHECK(!received_packet_callback_);
+  SignalReadPacket.connect(this, &AsyncPacketSocket::NotifyPacketReceived);
+  received_packet_callback_ = std::move(received_packet_callback);
+}
+
+void AsyncPacketSocket::DeregisterReceivedPacketCallback() {
+  RTC_DCHECK_RUN_ON(&network_checker_);
+  SignalReadPacket.disconnect(this);
+  received_packet_callback_ = nullptr;
+}
+
+void AsyncPacketSocket::NotifyPacketReceived(
+    const rtc::ReceivedPacket& packet) {
+  RTC_DCHECK_RUN_ON(&network_checker_);
+  if (received_packet_callback_) {
+    received_packet_callback_(this, packet);
+    return;
+  }
+  if (SignalReadPacket.is_empty()) {
+    RTC_DCHECK_NOTREACHED() << " No listener registered";
+    return;
+  }
+  // TODO(bugs.webrtc.org:15368): Remove. This code path is only used if
+  // SignalReadyPacket is used  by clients to get notification of received
+  // packets but actual socket implementation use NotifyPacketReceived to
+  // trigger the notification.
+  SignalReadPacket(this, reinterpret_cast<const char*>(packet.payload().data()),
+                   packet.payload().size(), packet.source_address(),
+                   packet.arrival_time() ? packet.arrival_time()->us() : -1);
+}
+
 void CopySocketInformationToPacketInfo(size_t packet_size_bytes,
                                        const AsyncPacketSocket& socket_from,
                                        bool is_connectionless,
diff --git a/rtc_base/async_packet_socket.h b/rtc_base/async_packet_socket.h
index 0d3ceb9..768fcd4 100644
--- a/rtc_base/async_packet_socket.h
+++ b/rtc_base/async_packet_socket.h
@@ -11,11 +11,13 @@
 #ifndef RTC_BASE_ASYNC_PACKET_SOCKET_H_
 #define RTC_BASE_ASYNC_PACKET_SOCKET_H_
 
+#include <cstdint>
 #include <vector>
 
 #include "api/sequence_checker.h"
 #include "rtc_base/callback_list.h"
 #include "rtc_base/dscp.h"
+#include "rtc_base/network/received_packet.h"
 #include "rtc_base/network/sent_packet.h"
 #include "rtc_base/socket.h"
 #include "rtc_base/system/no_unique_address.h"
@@ -115,8 +117,14 @@
       std::function<void(AsyncPacketSocket*, int)> callback);
   void UnsubscribeCloseEvent(const void* removal_tag);
 
+  void RegisterReceivedPacketCallback(
+      absl::AnyInvocable<void(AsyncPacketSocket*, const rtc::ReceivedPacket&)>
+          received_packet_callback);
+  void DeregisterReceivedPacketCallback();
+
   // Emitted each time a packet is read. Used only for UDP and
   // connected TCP sockets.
+  // TODO(bugs.webrtc.org:15368): Deprecate and remove.
   sigslot::signal5<AsyncPacketSocket*,
                    const char*,
                    size_t,
@@ -155,12 +163,26 @@
     on_close_.Send(this, err);
   }
 
+  // TODO(bugs.webrtc.org:15368): Deprecate and remove.
+  void NotifyPacketReceived(AsyncPacketSocket*,
+                            const char* data,
+                            size_t size,
+                            const SocketAddress& address,
+                            const int64_t& packet_time_us) {
+    NotifyPacketReceived(
+        ReceivedPacket::CreateFromLegacy(data, size, packet_time_us, address));
+  }
+
+  void NotifyPacketReceived(const rtc::ReceivedPacket& packet);
+
   RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker network_checker_{
       webrtc::SequenceChecker::kDetached};
 
  private:
   webrtc::CallbackList<AsyncPacketSocket*, int> on_close_
       RTC_GUARDED_BY(&network_checker_);
+  absl::AnyInvocable<void(AsyncPacketSocket*, const rtc::ReceivedPacket&)>
+      received_packet_callback_ RTC_GUARDED_BY(&network_checker_);
 };
 
 // Listen socket, producing an AsyncPacketSocket when a peer connects.
diff --git a/rtc_base/async_packet_socket_unittest.cc b/rtc_base/async_packet_socket_unittest.cc
new file mode 100644
index 0000000..6cd4f09
--- /dev/null
+++ b/rtc_base/async_packet_socket_unittest.cc
@@ -0,0 +1,110 @@
+/*
+ *  Copyright 2023 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/async_packet_socket.h"
+
+#include "rtc_base/third_party/sigslot/sigslot.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+
+namespace rtc {
+namespace {
+
+using ::testing::MockFunction;
+
+class MockAsyncPacketSocket : public rtc::AsyncPacketSocket {
+ public:
+  ~MockAsyncPacketSocket() = default;
+
+  MOCK_METHOD(SocketAddress, GetLocalAddress, (), (const, override));
+  MOCK_METHOD(SocketAddress, GetRemoteAddress, (), (const, override));
+  MOCK_METHOD(int,
+              Send,
+              (const void* pv, size_t cb, const rtc::PacketOptions& options),
+              (override));
+
+  MOCK_METHOD(int,
+              SendTo,
+              (const void* pv,
+               size_t cb,
+               const SocketAddress& addr,
+               const rtc::PacketOptions& options),
+              (override));
+  MOCK_METHOD(int, Close, (), (override));
+  MOCK_METHOD(State, GetState, (), (const, override));
+  MOCK_METHOD(int,
+              GetOption,
+              (rtc::Socket::Option opt, int* value),
+              (override));
+  MOCK_METHOD(int, SetOption, (rtc::Socket::Option opt, int value), (override));
+  MOCK_METHOD(int, GetError, (), (const, override));
+  MOCK_METHOD(void, SetError, (int error), (override));
+
+  void NotifyPacketReceived() {
+    char data[1] = {'a'};
+    AsyncPacketSocket::NotifyPacketReceived(this, data, 1, SocketAddress(), -1);
+  }
+};
+
+TEST(AsyncPacketSocket, RegisteredCallbackReceivePacketsFromNotify) {
+  MockAsyncPacketSocket mock_socket;
+  MockFunction<void(AsyncPacketSocket*, const rtc::ReceivedPacket&)>
+      received_packet;
+
+  EXPECT_CALL(received_packet, Call);
+  mock_socket.RegisterReceivedPacketCallback(received_packet.AsStdFunction());
+  mock_socket.NotifyPacketReceived();
+}
+
+TEST(AsyncPacketSocket, RegisteredCallbackReceivePacketsFromSignalReadPacket) {
+  MockAsyncPacketSocket mock_socket;
+  MockFunction<void(AsyncPacketSocket*, const rtc::ReceivedPacket&)>
+      received_packet;
+
+  EXPECT_CALL(received_packet, Call);
+  mock_socket.RegisterReceivedPacketCallback(received_packet.AsStdFunction());
+  char data[1] = {'a'};
+  mock_socket.SignalReadPacket(&mock_socket, data, 1, SocketAddress(), -1);
+}
+
+TEST(AsyncPacketSocket, SignalReadPacketTriggeredByNotifyPacketReceived) {
+  class SigslotPacketReceiver : public sigslot::has_slots<> {
+   public:
+    explicit SigslotPacketReceiver(rtc::AsyncPacketSocket& socket) {
+      socket.SignalReadPacket.connect(this,
+                                      &SigslotPacketReceiver::OnPacketReceived);
+    }
+
+    bool packet_received() const { return packet_received_; }
+
+   private:
+    void OnPacketReceived(AsyncPacketSocket*,
+                          const char*,
+                          size_t,
+                          const SocketAddress&,
+                          // TODO(bugs.webrtc.org/9584): Change to passing the
+                          // int64_t timestamp by value.
+                          const int64_t&) {
+      packet_received_ = true;
+    }
+
+    bool packet_received_ = false;
+  };
+
+  MockAsyncPacketSocket mock_socket;
+  SigslotPacketReceiver receiver(mock_socket);
+  ASSERT_FALSE(receiver.packet_received());
+
+  mock_socket.NotifyPacketReceived();
+  EXPECT_TRUE(receiver.packet_received());
+}
+
+}  // namespace
+}  // namespace rtc
diff --git a/rtc_base/network/BUILD.gn b/rtc_base/network/BUILD.gn
index a42745a..263bfcc 100644
--- a/rtc_base/network/BUILD.gn
+++ b/rtc_base/network/BUILD.gn
@@ -23,6 +23,7 @@
     "received_packet.h",
   ]
   deps = [
+    "..:socket_address",
     "../../api:array_view",
     "../../api/units:timestamp",
   ]
diff --git a/rtc_base/network/received_packet.cc b/rtc_base/network/received_packet.cc
index f27f01f..40d6e11 100644
--- a/rtc_base/network/received_packet.cc
+++ b/rtc_base/network/received_packet.cc
@@ -17,16 +17,22 @@
 namespace rtc {
 
 ReceivedPacket::ReceivedPacket(rtc::ArrayView<const uint8_t> payload,
+                               const SocketAddress& source_address,
                                absl::optional<webrtc::Timestamp> arrival_time)
-    : payload_(payload), arrival_time_(std::move(arrival_time)) {}
+    : payload_(payload),
+      arrival_time_(std::move(arrival_time)),
+      source_address_(source_address) {}
 
 // static
-ReceivedPacket ReceivedPacket::CreateFromLegacy(const char* data,
-                                                size_t size,
-                                                int64_t packet_time_us) {
+ReceivedPacket ReceivedPacket::CreateFromLegacy(
+    const char* data,
+    size_t size,
+    int64_t packet_time_us,
+    const rtc::SocketAddress& source_address) {
   RTC_DCHECK(packet_time_us == -1 || packet_time_us >= 0);
   return ReceivedPacket(rtc::reinterpret_array_view<const uint8_t>(
                             rtc::MakeArrayView(data, size)),
+                        source_address,
                         (packet_time_us >= 0)
                             ? absl::optional<webrtc::Timestamp>(
                                   webrtc::Timestamp::Micros(packet_time_us))
diff --git a/rtc_base/network/received_packet.h b/rtc_base/network/received_packet.h
index bcaf2a3..9b10099 100644
--- a/rtc_base/network/received_packet.h
+++ b/rtc_base/network/received_packet.h
@@ -15,6 +15,7 @@
 #include "absl/types/optional.h"
 #include "api/array_view.h"
 #include "api/units/timestamp.h"
+#include "rtc_base/socket_address.h"
 
 namespace rtc {
 
@@ -24,12 +25,15 @@
 // example it may contains STUN, SCTP, SRTP, RTP, RTCP.... etc.
 class ReceivedPacket {
  public:
-  // Caller must keep memory pointed to by payload valid for the lifetime of
-  // this ReceivedPacket.
+  // Caller must keep memory pointed to by payload and address valid for the
+  // lifetime of this ReceivedPacket.
   ReceivedPacket(
       rtc::ArrayView<const uint8_t> payload,
+      const SocketAddress& source_address,
       absl::optional<webrtc::Timestamp> arrival_time = absl::nullopt);
 
+  // Address/port of the packet sender.
+  const SocketAddress& source_address() const { return source_address_; }
   rtc::ArrayView<const uint8_t> payload() const { return payload_; }
 
   // Timestamp when this packet was received. Not available on all socket
@@ -38,13 +42,16 @@
     return arrival_time_;
   }
 
-  static ReceivedPacket CreateFromLegacy(const char* data,
-                                         size_t size,
-                                         int64_t packet_time_us);
+  static ReceivedPacket CreateFromLegacy(
+      const char* data,
+      size_t size,
+      int64_t packet_time_us,
+      const rtc::SocketAddress& = rtc::SocketAddress());
 
  private:
   rtc::ArrayView<const uint8_t> payload_;
   absl::optional<webrtc::Timestamp> arrival_time_;
+  const SocketAddress& source_address_;
 };
 
 }  // namespace rtc