Introduce Connection::RegisterReceivedPacketCallback

RegisterReceivedPacketCallback is used instead of
sigslot::SignalReadPacket. The callback use a new data class ReceivedPacket that combine meta
data and packet payload from a received packet.

This is the first step in an attempt to cleanup the data types used in
the packet receive pipeline.
Eventually, the ReceivedPacket class can contain more meta data such as
ECN information.

Bug: webrtc:11943,webrtc:15368
Change-Id: I984c561b9262fe4aa00176529bd8d901adf66640
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/325060
Reviewed-by: Jonas Oreland <jonaso@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#41021}
diff --git a/p2p/BUILD.gn b/p2p/BUILD.gn
index 4e4924a..596a9fe 100644
--- a/p2p/BUILD.gn
+++ b/p2p/BUILD.gn
@@ -161,6 +161,7 @@
     "../api/task_queue:pending_task_safety_flag",
     "../rtc_base:safe_minmax",
     "../rtc_base:weak_ptr",
+    "../rtc_base/network:received_packet",
     "../rtc_base/network:sent_packet",
     "../rtc_base/synchronization:mutex",
     "../rtc_base/system:rtc_export",
diff --git a/p2p/base/connection.cc b/p2p/base/connection.cc
index 94e5911..3bdba0e 100644
--- a/p2p/base/connection.cc
+++ b/p2p/base/connection.cc
@@ -13,6 +13,7 @@
 #include <math.h>
 
 #include <algorithm>
+#include <cstdint>
 #include <memory>
 #include <utility>
 #include <vector>
@@ -21,6 +22,9 @@
 #include "absl/strings/escaping.h"
 #include "absl/strings/match.h"
 #include "absl/strings/string_view.h"
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "api/units/timestamp.h"
 #include "p2p/base/port_allocator.h"
 #include "rtc_base/checks.h"
 #include "rtc_base/crc32.h"
@@ -246,6 +250,7 @@
 Connection::~Connection() {
   RTC_DCHECK_RUN_ON(network_thread_);
   RTC_DCHECK(!port_);
+  RTC_DCHECK(!received_packet_callback_);
 }
 
 webrtc::TaskQueueBase* Connection::network_thread() const {
@@ -445,6 +450,19 @@
   }
 }
 
+void Connection::RegisterReceivedPacketCallback(
+    absl::AnyInvocable<void(Connection*, const rtc::ReceivedPacket&)>
+        received_packet_callback) {
+  RTC_DCHECK_RUN_ON(network_thread_);
+  RTC_CHECK(!received_packet_callback_);
+  received_packet_callback_ = std::move(received_packet_callback);
+}
+
+void Connection::DeregisterReceivedPacketCallback() {
+  RTC_DCHECK_RUN_ON(network_thread_);
+  received_packet_callback_ = nullptr;
+}
+
 void Connection::OnReadPacket(const char* data,
                               size_t size,
                               int64_t packet_time_us) {
@@ -459,8 +477,22 @@
     UpdateReceiving(last_data_received_);
     recv_rate_tracker_.AddSamples(size);
     stats_.packets_received++;
-    SignalReadPacket(this, data, size, packet_time_us);
-
+    if (received_packet_callback_) {
+      RTC_DCHECK(packet_time_us == -1 || packet_time_us >= 0);
+      RTC_DCHECK(SignalReadPacket.is_empty());
+      received_packet_callback_(
+          this, rtc::ReceivedPacket(
+                    rtc::reinterpret_array_view<const uint8_t>(
+                        rtc::MakeArrayView(data, size)),
+                    (packet_time_us >= 0)
+                        ? absl::optional<webrtc::Timestamp>(
+                              webrtc::Timestamp::Micros(packet_time_us))
+                        : absl::nullopt));
+    } else {
+      // TODO(webrtc:11943): Remove SignalReadPacket once upstream projects have
+      // switched to use RegisterReceivedPacket.
+      SignalReadPacket(this, data, size, packet_time_us);
+    }
     // If timed out sending writability checks, start up again
     if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) {
       RTC_LOG(LS_WARNING)
diff --git a/p2p/base/connection.h b/p2p/base/connection.h
index 1ea9180..3481c69 100644
--- a/p2p/base/connection.h
+++ b/p2p/base/connection.h
@@ -28,6 +28,7 @@
 #include "p2p/base/transport_description.h"
 #include "rtc_base/async_packet_socket.h"
 #include "rtc_base/network.h"
+#include "rtc_base/network/received_packet.h"
 #include "rtc_base/numerics/event_based_exponential_moving_average.h"
 #include "rtc_base/rate_tracker.h"
 #include "rtc_base/system/rtc_export.h"
@@ -146,8 +147,16 @@
   // Error if Send() returns < 0
   virtual int GetError() = 0;
 
+  // TODO(webrtc:11943): Remove SignalReadPacket once upstream projects have
+  // switched to use RegisterReceivedPacket.
   sigslot::signal4<Connection*, const char*, size_t, int64_t> SignalReadPacket;
 
+  // Register as a recipient of received packets. There can only be one.
+  void RegisterReceivedPacketCallback(
+      absl::AnyInvocable<void(Connection*, const rtc::ReceivedPacket&)>
+          received_packet_callback);
+  void DeregisterReceivedPacketCallback();
+
   sigslot::signal1<Connection*> SignalReadyToSend;
 
   // Called when a packet is received on this connection.
@@ -501,6 +510,8 @@
   absl::optional<
       std::function<void(webrtc::RTCErrorOr<const StunUInt64Attribute*>)>>
       goog_delta_ack_consumer_;
+  absl::AnyInvocable<void(Connection*, const rtc::ReceivedPacket&)>
+      received_packet_callback_;
 };
 
 // ProxyConnection defers all the interesting work to the port.
diff --git a/p2p/base/p2p_transport_channel.cc b/p2p/base/p2p_transport_channel.cc
index fe11920..5ddab77 100644
--- a/p2p/base/p2p_transport_channel.cc
+++ b/p2p/base/p2p_transport_channel.cc
@@ -276,8 +276,10 @@
   connection->set_unwritable_timeout(config_.ice_unwritable_timeout);
   connection->set_unwritable_min_checks(config_.ice_unwritable_min_checks);
   connection->set_inactive_timeout(config_.ice_inactive_timeout);
-  connection->SignalReadPacket.connect(this,
-                                       &P2PTransportChannel::OnReadPacket);
+  connection->RegisterReceivedPacketCallback(
+      [&](Connection* connection, const rtc::ReceivedPacket& packet) {
+        OnReadPacket(connection, packet);
+      });
   connection->SignalReadyToSend.connect(this,
                                         &P2PTransportChannel::OnReadyToSend);
   connection->SignalStateChange.connect(
@@ -2151,6 +2153,7 @@
   RTC_DCHECK_RUN_ON(network_thread_);
   auto it = absl::c_find(connections_, connection);
   RTC_DCHECK(it != connections_.end());
+  connection->DeregisterReceivedPacketCallback();
   connections_.erase(it);
   connection->ClearStunDictConsumer();
   ice_controller_->OnConnectionDestroyed(connection);
@@ -2221,39 +2224,30 @@
 
 // We data is available, let listeners know
 void P2PTransportChannel::OnReadPacket(Connection* connection,
-                                       const char* data,
-                                       size_t len,
-                                       int64_t packet_time_us) {
+                                       const rtc::ReceivedPacket& packet) {
   RTC_DCHECK_RUN_ON(network_thread_);
-
-  if (connection == selected_connection_) {
-    // Let the client know of an incoming packet
-    packets_received_++;
-    bytes_received_ += len;
-    RTC_DCHECK(connection->last_data_received() >= last_data_received_ms_);
-    last_data_received_ms_ =
-        std::max(last_data_received_ms_, connection->last_data_received());
-    SignalReadPacket(this, data, len, packet_time_us, 0);
+  if (connection != selected_connection_ && !FindConnection(connection)) {
+    // Do not deliver, if packet doesn't belong to the correct transport
+    // channel.
+    RTC_DCHECK_NOTREACHED();
     return;
   }
 
-  // Do not deliver, if packet doesn't belong to the correct transport
-  // channel.
-  if (!FindConnection(connection))
-    return;
+    // Let the client know of an incoming packet
+    packets_received_++;
+    bytes_received_ += packet.payload().size();
+    RTC_DCHECK(connection->last_data_received() >= last_data_received_ms_);
+    last_data_received_ms_ =
+        std::max(last_data_received_ms_, connection->last_data_received());
 
-  packets_received_++;
-  bytes_received_ += len;
-  RTC_DCHECK(connection->last_data_received() >= last_data_received_ms_);
-  last_data_received_ms_ =
-      std::max(last_data_received_ms_, connection->last_data_received());
-
-  // Let the client know of an incoming packet
-  SignalReadPacket(this, data, len, packet_time_us, 0);
+    SignalReadPacket(
+        this, reinterpret_cast<const char*>(packet.payload().data()),
+        packet.payload().size(),
+        packet.arrival_time() ? packet.arrival_time()->us() : -1, 0);
 
   // May need to switch the sending connection based on the receiving media
   // path if this is the controlled side.
-  if (ice_role_ == ICEROLE_CONTROLLED) {
+  if (ice_role_ == ICEROLE_CONTROLLED && connection != selected_connection_) {
     ice_controller_->OnImmediateSwitchRequest(IceSwitchReason::DATA_RECEIVED,
                                               connection);
   }
diff --git a/p2p/base/p2p_transport_channel.h b/p2p/base/p2p_transport_channel.h
index 1e0d1e3..7f85018 100644
--- a/p2p/base/p2p_transport_channel.h
+++ b/p2p/base/p2p_transport_channel.h
@@ -342,10 +342,7 @@
   void OnRoleConflict(PortInterface* port);
 
   void OnConnectionStateChange(Connection* connection);
-  void OnReadPacket(Connection* connection,
-                    const char* data,
-                    size_t len,
-                    int64_t packet_time_us);
+  void OnReadPacket(Connection* connection, const rtc::ReceivedPacket& packet);
   void OnSentPacket(const rtc::SentPacket& sent_packet);
   void OnReadyToSend(Connection* connection);
   void OnConnectionDestroyed(Connection* connection);
diff --git a/p2p/base/turn_port_unittest.cc b/p2p/base/turn_port_unittest.cc
index cf9ca09..55706e1 100644
--- a/p2p/base/turn_port_unittest.cc
+++ b/p2p/base/turn_port_unittest.cc
@@ -216,19 +216,7 @@
                             bool /*port_muxed*/) {
     turn_unknown_address_ = true;
   }
-  void OnTurnReadPacket(Connection* conn,
-                        const char* data,
-                        size_t size,
-                        int64_t packet_time_us) {
-    turn_packets_.push_back(rtc::Buffer(data, size));
-  }
   void OnUdpPortComplete(Port* port) { udp_ready_ = true; }
-  void OnUdpReadPacket(Connection* conn,
-                       const char* data,
-                       size_t size,
-                       int64_t packet_time_us) {
-    udp_packets_.push_back(rtc::Buffer(data, size));
-  }
   void OnSocketReadPacket(rtc::AsyncPacketSocket* socket,
                           const char* data,
                           size_t size,
@@ -248,6 +236,10 @@
   }
   void OnTurnPortClosed() override { turn_port_closed_ = true; }
 
+  void OnConnectionSignalDestroyed(Connection* connection) {
+    connection->DeregisterReceivedPacketCallback();
+  }
+
   rtc::Socket* CreateServerSocket(const SocketAddress addr) {
     rtc::Socket* socket = ss_->CreateSocket(AF_INET, SOCK_STREAM);
     EXPECT_GE(socket->Bind(addr), 0);
@@ -727,10 +719,20 @@
                                                     Port::ORIGIN_MESSAGE);
     ASSERT_TRUE(conn1 != NULL);
     ASSERT_TRUE(conn2 != NULL);
-    conn1->SignalReadPacket.connect(static_cast<TurnPortTest*>(this),
-                                    &TurnPortTest::OnTurnReadPacket);
-    conn2->SignalReadPacket.connect(static_cast<TurnPortTest*>(this),
-                                    &TurnPortTest::OnUdpReadPacket);
+    conn1->RegisterReceivedPacketCallback(
+        [&](Connection* connection, const rtc::ReceivedPacket& packet) {
+          turn_packets_.push_back(
+              rtc::Buffer(packet.payload().data(), packet.payload().size()));
+        });
+    conn1->SignalDestroyed.connect(this,
+                                   &TurnPortTest::OnConnectionSignalDestroyed);
+    conn2->RegisterReceivedPacketCallback(
+        [&](Connection* connection, const rtc::ReceivedPacket& packet) {
+          udp_packets_.push_back(
+              rtc::Buffer(packet.payload().data(), packet.payload().size()));
+        });
+    conn2->SignalDestroyed.connect(this,
+                                   &TurnPortTest::OnConnectionSignalDestroyed);
     conn1->Ping(0);
     EXPECT_EQ_SIMULATED_WAIT(Connection::STATE_WRITABLE, conn1->write_state(),
                              kSimulatedRtt * 2, fake_clock_);
@@ -780,10 +782,21 @@
                                                     Port::ORIGIN_MESSAGE);
     ASSERT_TRUE(conn1 != NULL);
     ASSERT_TRUE(conn2 != NULL);
-    conn1->SignalReadPacket.connect(static_cast<TurnPortTest*>(this),
-                                    &TurnPortTest::OnTurnReadPacket);
-    conn2->SignalReadPacket.connect(static_cast<TurnPortTest*>(this),
-                                    &TurnPortTest::OnUdpReadPacket);
+    conn1->RegisterReceivedPacketCallback(
+        [&](Connection* connection, const rtc::ReceivedPacket& packet) {
+          turn_packets_.push_back(
+              rtc::Buffer(packet.payload().data(), packet.payload().size()));
+        });
+    conn1->SignalDestroyed.connect(this,
+                                   &TurnPortTest::OnConnectionSignalDestroyed);
+    conn2->RegisterReceivedPacketCallback(
+        [&](Connection* connection, const rtc::ReceivedPacket& packet) {
+          udp_packets_.push_back(
+              rtc::Buffer(packet.payload().data(), packet.payload().size()));
+        });
+    conn2->SignalDestroyed.connect(this,
+                                   &TurnPortTest::OnConnectionSignalDestroyed);
+
     conn1->Ping(0);
     EXPECT_EQ_SIMULATED_WAIT(Connection::STATE_WRITABLE, conn1->write_state(),
                              kSimulatedRtt * 2, fake_clock_);
@@ -1507,10 +1520,15 @@
                              kSimulatedRtt, fake_clock_);
   // Verify that packets are allowed to be sent after a bind request error.
   // They'll just use a send indication instead.
-  conn2->SignalReadPacket.connect(static_cast<TurnPortTest*>(this),
-                                  &TurnPortTest::OnUdpReadPacket);
+
+  conn2->RegisterReceivedPacketCallback(
+      [&](Connection* connection, const rtc::ReceivedPacket& packet) {
+        udp_packets_.push_back(
+            rtc::Buffer(packet.payload().data(), packet.payload().size()));
+      });
   conn1->Send(data.data(), data.length(), options);
   EXPECT_TRUE_SIMULATED_WAIT(!udp_packets_.empty(), kSimulatedRtt, fake_clock_);
+  conn2->DeregisterReceivedPacketCallback();
 }
 
 // Do a TURN allocation, establish a UDP connection, and send some data.
diff --git a/rtc_base/network/BUILD.gn b/rtc_base/network/BUILD.gn
index 35ae3d4..a42745a 100644
--- a/rtc_base/network/BUILD.gn
+++ b/rtc_base/network/BUILD.gn
@@ -16,3 +16,18 @@
   deps = [ "../system:rtc_export" ]
   absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
 }
+
+rtc_library("received_packet") {
+  sources = [
+    "received_packet.cc",
+    "received_packet.h",
+  ]
+  deps = [
+    "../../api:array_view",
+    "../../api/units:timestamp",
+  ]
+  absl_deps = [
+    "//third_party/abseil-cpp/absl/functional:any_invocable",
+    "//third_party/abseil-cpp/absl/types:optional",
+  ]
+}
diff --git a/rtc_base/network/received_packet.cc b/rtc_base/network/received_packet.cc
new file mode 100644
index 0000000..9612c3d
--- /dev/null
+++ b/rtc_base/network/received_packet.cc
@@ -0,0 +1,23 @@
+/*
+ *  Copyright 2023 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "rtc_base/network/received_packet.h"
+
+#include <utility>
+
+#include "absl/types/optional.h"
+
+namespace rtc {
+
+ReceivedPacket::ReceivedPacket(rtc::ArrayView<const uint8_t> payload,
+                               absl::optional<webrtc::Timestamp> arrival_time)
+    : payload_(payload), arrival_time_(std::move(arrival_time)) {}
+
+}  // namespace rtc
diff --git a/rtc_base/network/received_packet.h b/rtc_base/network/received_packet.h
new file mode 100644
index 0000000..7f8b2f9
--- /dev/null
+++ b/rtc_base/network/received_packet.h
@@ -0,0 +1,47 @@
+/*
+ *  Copyright 2023 The WebRTC Project Authors. All rights reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef RTC_BASE_NETWORK_RECEIVED_PACKET_H_
+#define RTC_BASE_NETWORK_RECEIVED_PACKET_H_
+
+#include <cstdint>
+
+#include "absl/types/optional.h"
+#include "api/array_view.h"
+#include "api/units/timestamp.h"
+
+namespace rtc {
+
+// ReceivedPacket repressent a received IP packet.
+// It contains a payload and metadata.
+// ReceivedPacket itself does not put constraints on what payload contains. For
+// example it may contains STUN, SCTP, SRTP, RTP, RTCP.... etc.
+class ReceivedPacket {
+ public:
+  // Caller must keep memory pointed to by payload valid for the lifetime of
+  // this ReceivedPacket.
+  ReceivedPacket(
+      rtc::ArrayView<const uint8_t> payload,
+      absl::optional<webrtc::Timestamp> arrival_time = absl::nullopt);
+
+  rtc::ArrayView<const uint8_t> payload() const { return payload_; }
+
+  // Timestamp when this packet was received. Not available on all socket
+  // implementations.
+  absl::optional<webrtc::Timestamp> arrival_time() const {
+    return arrival_time_;
+  }
+
+ private:
+  rtc::ArrayView<const uint8_t> payload_;
+  absl::optional<webrtc::Timestamp> arrival_time_;
+};
+
+}  // namespace rtc
+#endif  // RTC_BASE_NETWORK_RECEIVED_PACKET_H_