Replace Socket's sigslot ConnectEvent and CloseEvent with siglsot trampoline
Bug: webrtc:42222066
Change-Id: I9c3a5bf1b826480f91c48c65f9bcf403d6b7a7d0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/413581
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Lena Kaplan <lenakaplan@meta.com>
Cr-Commit-Position: refs/heads/main@{#45821}
diff --git a/examples/peerconnection/client/peer_connection_client.cc b/examples/peerconnection/client/peer_connection_client.cc
index 5afed42..d4701d0 100644
--- a/examples/peerconnection/client/peer_connection_client.cc
+++ b/examples/peerconnection/client/peer_connection_client.cc
@@ -55,13 +55,14 @@
void PeerConnectionClient::InitSocketSignals() {
RTC_DCHECK(control_socket_.get() != nullptr);
RTC_DCHECK(hanging_get_.get() != nullptr);
- control_socket_->SignalCloseEvent.connect(this,
- &PeerConnectionClient::OnClose);
- hanging_get_->SignalCloseEvent.connect(this, &PeerConnectionClient::OnClose);
- control_socket_->SignalConnectEvent.connect(this,
- &PeerConnectionClient::OnConnect);
- hanging_get_->SignalConnectEvent.connect(
- this, &PeerConnectionClient::OnHangingGetConnect);
+ control_socket_->SubscribeCloseEvent(
+ [this](webrtc::Socket* socket, int error) { OnClose(socket, error); });
+ hanging_get_->SubscribeCloseEvent(
+ [this](webrtc::Socket* socket, int error) { OnClose(socket, error); });
+ control_socket_->SubscribeConnectEvent(
+ [this](webrtc::Socket* socket) { OnConnect(socket); });
+ hanging_get_->SubscribeConnectEvent(
+ [this](webrtc::Socket* socket) { OnHangingGetConnect(socket); });
control_socket_->SignalReadEvent.connect(this, &PeerConnectionClient::OnRead);
hanging_get_->SignalReadEvent.connect(
this, &PeerConnectionClient::OnHangingGetRead);
diff --git a/p2p/test/nat_socket_factory.cc b/p2p/test/nat_socket_factory.cc
index 3404615..9cf11c9 100644
--- a/p2p/test/nat_socket_factory.cc
+++ b/p2p/test/nat_socket_factory.cc
@@ -252,7 +252,7 @@
RTC_DCHECK(socket == socket_);
if (server_addr_.IsNil()) {
connected_ = true;
- SignalConnectEvent(this);
+ NotifyConnectEvent(this);
} else {
SendConnectRequest();
}
@@ -272,7 +272,7 @@
}
void OnCloseEvent(Socket* socket, int error) {
RTC_DCHECK(socket == socket_);
- SignalCloseEvent(this, error);
+ NotifyCloseEvent(this, error);
}
private:
@@ -283,10 +283,13 @@
socket_ = sf_->CreateInternalSocket(family_, type_, addr, &server_addr_);
result = (socket_) ? socket_->Bind(addr) : -1;
if (result >= 0) {
- socket_->SignalConnectEvent.connect(this, &NATSocket::OnConnectEvent);
+ socket_->SubscribeConnectEvent(
+ [this](Socket* socket) { OnConnectEvent(socket); });
socket_->SignalReadEvent.connect(this, &NATSocket::OnReadEvent);
socket_->SignalWriteEvent.connect(this, &NATSocket::OnWriteEvent);
- socket_->SignalCloseEvent.connect(this, &NATSocket::OnCloseEvent);
+ socket_->SubscribeCloseEvent(this, [this](Socket* socket, int error) {
+ OnCloseEvent(socket, error);
+ });
} else {
server_addr_.Clear();
delete socket_;
@@ -309,10 +312,10 @@
socket_->Recv(&code, sizeof(code), nullptr);
if (code == 0) {
connected_ = true;
- SignalConnectEvent(this);
+ NotifyConnectEvent(this);
} else {
Close();
- SignalCloseEvent(this, code);
+ NotifyCloseEvent(this, code);
}
}
diff --git a/p2p/test/nat_unittest.cc b/p2p/test/nat_unittest.cc
index 441c373..6d9ee05 100644
--- a/p2p/test/nat_unittest.cc
+++ b/p2p/test/nat_unittest.cc
@@ -368,7 +368,8 @@
void ConnectEvents() {
server_->SignalReadEvent.connect(this, &NatTcpTest::OnAcceptEvent);
- client_->SignalConnectEvent.connect(this, &NatTcpTest::OnConnectEvent);
+ client_->SubscribeConnectEvent(
+ [this](Socket* socket) { OnConnectEvent(socket); });
}
const Environment env_;
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index cb00a44..78e6bd7 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -1050,6 +1050,7 @@
"async_socket.h",
]
deps = [
+ ":callback_list",
":checks",
":socket",
":socket_address",
@@ -1068,11 +1069,13 @@
":checks",
":ip_address",
":net_helpers",
+ ":sigslot_trampoline",
":socket_address",
"../api/transport:ecn_marking",
"../api/units:timestamp",
"system:rtc_export",
"third_party/sigslot",
+ "//third_party/abseil-cpp/absl/functional:any_invocable",
]
if (is_win) {
deps += [ ":win32" ]
diff --git a/rtc_base/async_socket.cc b/rtc_base/async_socket.cc
index 1571811..f504254 100644
--- a/rtc_base/async_socket.cc
+++ b/rtc_base/async_socket.cc
@@ -23,11 +23,12 @@
AsyncSocketAdapter::AsyncSocketAdapter(Socket* socket)
: socket_(absl::WrapUnique(socket)) {
RTC_DCHECK(socket_);
- socket_->SignalConnectEvent.connect(this,
- &AsyncSocketAdapter::OnConnectEvent);
+ socket_->SubscribeConnectEvent(
+ [this](Socket* socket) { OnConnectEvent(socket); });
socket_->SignalReadEvent.connect(this, &AsyncSocketAdapter::OnReadEvent);
socket_->SignalWriteEvent.connect(this, &AsyncSocketAdapter::OnWriteEvent);
- socket_->SignalCloseEvent.connect(this, &AsyncSocketAdapter::OnCloseEvent);
+ socket_->SubscribeCloseEvent(
+ [this](Socket* socket, int err) { OnCloseEvent(socket, err); });
}
SocketAddress AsyncSocketAdapter::GetLocalAddress() const {
@@ -100,7 +101,7 @@
}
void AsyncSocketAdapter::OnConnectEvent(Socket* socket) {
- SignalConnectEvent(this);
+ NotifyConnectEvent(this);
}
void AsyncSocketAdapter::OnReadEvent(Socket* socket) {
@@ -112,7 +113,7 @@
}
void AsyncSocketAdapter::OnCloseEvent(Socket* socket, int err) {
- SignalCloseEvent(this, err);
+ NotifyCloseEvent(this, err);
}
} // namespace webrtc
diff --git a/rtc_base/async_tcp_socket.cc b/rtc_base/async_tcp_socket.cc
index 7bf3ec8..29f611c 100644
--- a/rtc_base/async_tcp_socket.cc
+++ b/rtc_base/async_tcp_socket.cc
@@ -58,11 +58,12 @@
max_outsize_(max_packet_size) {
inbuf_.EnsureCapacity(kMinimumRecvSize);
- socket_->SignalConnectEvent.connect(this,
- &AsyncTCPSocketBase::OnConnectEvent);
+ socket_->SubscribeConnectEvent(
+ [this](Socket* socket) { OnConnectEvent(socket); });
socket_->SignalReadEvent.connect(this, &AsyncTCPSocketBase::OnReadEvent);
socket_->SignalWriteEvent.connect(this, &AsyncTCPSocketBase::OnWriteEvent);
- socket_->SignalCloseEvent.connect(this, &AsyncTCPSocketBase::OnCloseEvent);
+ socket_->SubscribeCloseEvent(
+ [this](Socket* socket, int error) { OnCloseEvent(socket, error); });
}
AsyncTCPSocketBase::~AsyncTCPSocketBase() {}
diff --git a/rtc_base/physical_socket_server.cc b/rtc_base/physical_socket_server.cc
index 10adcfe..c7bc1ed 100644
--- a/rtc_base/physical_socket_server.cc
+++ b/rtc_base/physical_socket_server.cc
@@ -658,7 +658,7 @@
if (error) {
SetError(error);
- SignalCloseEvent(this, error);
+ NotifyCloseEvent(this, error);
}
}
@@ -1031,7 +1031,7 @@
// something like a READ followed by a CONNECT, which would be odd.
if ((ff & DE_CONNECT) != 0) {
DisableEvents(DE_CONNECT);
- SignalConnectEvent(this);
+ NotifyConnectEvent(this);
}
if ((ff & DE_ACCEPT) != 0) {
DisableEvents(DE_ACCEPT);
@@ -1048,7 +1048,7 @@
if ((ff & DE_CLOSE) != 0) {
// The socket is now dead to us, so stop checking it.
SetEnabledEvents(0);
- SignalCloseEvent(this, err);
+ NotifyCloseEvent(this, err);
}
#if defined(WEBRTC_USE_EPOLL)
FinishBatchedEventUpdates();
diff --git a/rtc_base/proxy_server.cc b/rtc_base/proxy_server.cc
index 6abb15b..d5c9c56 100644
--- a/rtc_base/proxy_server.cc
+++ b/rtc_base/proxy_server.cc
@@ -79,12 +79,14 @@
});
int_socket_->SignalReadEvent.connect(this, &ProxyBinding::OnInternalRead);
int_socket_->SignalWriteEvent.connect(this, &ProxyBinding::OnInternalWrite);
- int_socket_->SignalCloseEvent.connect(this, &ProxyBinding::OnInternalClose);
- ext_socket_->SignalConnectEvent.connect(this,
- &ProxyBinding::OnExternalConnect);
+ int_socket_->SubscribeCloseEvent(
+ [this](Socket* socket, int error) { OnInternalClose(socket, error); });
+ ext_socket_->SubscribeConnectEvent(
+ [this](Socket* socket) { OnExternalConnect(socket); });
ext_socket_->SignalReadEvent.connect(this, &ProxyBinding::OnExternalRead);
ext_socket_->SignalWriteEvent.connect(this, &ProxyBinding::OnExternalWrite);
- ext_socket_->SignalCloseEvent.connect(this, &ProxyBinding::OnExternalClose);
+ ext_socket_->SubscribeCloseEvent(
+ [this](Socket* socket, int error) { OnExternalClose(socket, error); });
}
ProxyBinding::~ProxyBinding() = default;
diff --git a/rtc_base/server_socket_adapters.cc b/rtc_base/server_socket_adapters.cc
index bf41ce9..6237891 100644
--- a/rtc_base/server_socket_adapters.cc
+++ b/rtc_base/server_socket_adapters.cc
@@ -43,7 +43,7 @@
if (memcmp(client_hello.data(), data, client_hello.size()) != 0) {
Close();
- SignalCloseEvent(this, 0);
+ NotifyCloseEvent(this, 0);
return;
}
diff --git a/rtc_base/socket.h b/rtc_base/socket.h
index ad4e132..67f3c73 100644
--- a/rtc_base/socket.h
+++ b/rtc_base/socket.h
@@ -15,11 +15,14 @@
#include <cstddef>
#include <cstdint>
#include <optional>
+#include <utility>
+#include "absl/functional/any_invocable.h"
#include "api/transport/ecn_marking.h"
#include "api/units/timestamp.h"
#include "rtc_base/buffer.h"
#include "rtc_base/checks.h"
+#include "rtc_base/sigslot_trampoline.h"
#include "rtc_base/socket_address.h"
#include "rtc_base/system/rtc_export.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
@@ -161,14 +164,43 @@
sigslot::signal1<Socket*, sigslot::multi_threaded_local> SignalReadEvent;
// ready to write
sigslot::signal1<Socket*, sigslot::multi_threaded_local> SignalWriteEvent;
- sigslot::signal1<Socket*> SignalConnectEvent; // connected
+ sigslot::signal1<Socket*> SignalConnectEvent; // connected
+ void SubscribeConnectEvent(void* tag,
+ absl::AnyInvocable<void(Socket*)> callback) {
+ connect_event_trampoline_.Subscribe(tag, std::move(callback));
+ }
+ void UnsubscribeConnectEvent(void* tag) {
+ connect_event_trampoline_.Unsubscribe(tag);
+ }
+ void SubscribeConnectEvent(absl::AnyInvocable<void(Socket*)> callback) {
+ connect_event_trampoline_.Subscribe(std::move(callback));
+ }
+ void NotifyConnectEvent(Socket* socket) { SignalConnectEvent(socket); }
+
sigslot::signal2<Socket*, int> SignalCloseEvent; // closed
+ void SubscribeCloseEvent(void* tag,
+ absl::AnyInvocable<void(Socket*, int)> callback) {
+ close_event_trampoline_.Subscribe(tag, std::move(callback));
+ }
+ void UnsubscribeCloseEvent(void* tag) {
+ close_event_trampoline_.Unsubscribe(tag);
+ }
+ void SubscribeCloseEvent(absl::AnyInvocable<void(Socket*, int)> callback) {
+ close_event_trampoline_.Subscribe(std::move(callback));
+ }
+ void NotifyCloseEvent(Socket* socket, int error) {
+ SignalCloseEvent(socket, error);
+ }
protected:
- Socket() {}
+ Socket() : connect_event_trampoline_(this), close_event_trampoline_(this) {}
+
+ private:
+ SignalTrampoline<Socket, &Socket::SignalConnectEvent>
+ connect_event_trampoline_;
+ SignalTrampoline<Socket, &Socket::SignalCloseEvent> close_event_trampoline_;
};
} // namespace webrtc
-
#endif // RTC_BASE_SOCKET_H_
diff --git a/rtc_base/socket_adapters.cc b/rtc_base/socket_adapters.cc
index a3ff8bd..04e69f2 100644
--- a/rtc_base/socket_adapters.cc
+++ b/rtc_base/socket_adapters.cc
@@ -184,7 +184,7 @@
if (res != sizeof(kSslClientHello)) {
RTC_LOG(LS_ERROR) << "Sending fake SSL ClientHello message failed.";
Close();
- SignalCloseEvent(this, 0);
+ NotifyCloseEvent(this, 0);
}
}
@@ -195,7 +195,7 @@
if (memcmp(kSslServerHello, data, sizeof(kSslServerHello)) != 0) {
RTC_LOG(LS_ERROR) << "Received non-matching fake SSL ServerHello message.";
Close();
- SignalCloseEvent(this, 0); // TODO: error code?
+ NotifyCloseEvent(this, 0); // TODO: error code?
return;
}
@@ -206,7 +206,7 @@
bool remainder = (*len > 0);
BufferInput(false);
- SignalConnectEvent(this);
+ NotifyConnectEvent(this);
// FIX: if SignalConnect causes the socket to be destroyed, we are in trouble
if (remainder)
diff --git a/rtc_base/socket_unittest.cc b/rtc_base/socket_unittest.cc
index d04f23b..0d700c8 100644
--- a/rtc_base/socket_unittest.cc
+++ b/rtc_base/socket_unittest.cc
@@ -682,7 +682,9 @@
std::unique_ptr<Socket> client =
socket_factory_->Create(loopback.family(), SOCK_STREAM);
sink.Monitor(client.get());
- client->SignalCloseEvent.connect(&closer, &SocketCloser::OnClose);
+ client->SubscribeCloseEvent([&closer](webrtc::Socket* socket, int error) {
+ closer.OnClose(socket, error);
+ });
// Create server and listen.
std::unique_ptr<Socket> server =
diff --git a/rtc_base/ssl_adapter_unittest.cc b/rtc_base/ssl_adapter_unittest.cc
index 9281474..e76821d 100644
--- a/rtc_base/ssl_adapter_unittest.cc
+++ b/rtc_base/ssl_adapter_unittest.cc
@@ -73,8 +73,10 @@
ssl_adapter_->SignalReadEvent.connect(
this, &SSLAdapterTestDummy::OnSSLAdapterReadEvent);
- ssl_adapter_->SignalCloseEvent.connect(
- this, &SSLAdapterTestDummy::OnSSLAdapterCloseEvent);
+ ssl_adapter_->SubscribeCloseEvent(
+ [this](webrtc::Socket* socket, int error) {
+ OnSSLAdapterCloseEvent(socket, error);
+ });
ssl_adapter_->SetRole(role);
}
diff --git a/rtc_base/test_utils.h b/rtc_base/test_utils.h
index 53a7a03..e0c81c1 100644
--- a/rtc_base/test_utils.h
+++ b/rtc_base/test_utils.h
@@ -43,18 +43,22 @@
~StreamSink() override;
void Monitor(Socket* socket) {
- socket->SignalConnectEvent.connect(this, &StreamSink::OnConnectEvent);
+ socket->SubscribeConnectEvent(
+ this, [this](Socket* socket) { OnConnectEvent(socket); });
socket->SignalReadEvent.connect(this, &StreamSink::OnReadEvent);
socket->SignalWriteEvent.connect(this, &StreamSink::OnWriteEvent);
- socket->SignalCloseEvent.connect(this, &StreamSink::OnCloseEvent);
+ socket->SubscribeCloseEvent(this, [this](Socket* socket, int error) {
+ OnCloseEvent(socket, error);
+ });
// In case you forgot to unmonitor a previous object with this address
events_.erase(socket);
}
void Unmonitor(Socket* socket) {
- socket->SignalConnectEvent.disconnect(this);
+ socket->UnsubscribeConnectEvent(this);
socket->SignalReadEvent.disconnect(this);
socket->SignalWriteEvent.disconnect(this);
- socket->SignalCloseEvent.disconnect(this);
+ socket->UnsubscribeCloseEvent(this);
+
events_.erase(socket);
}
bool Check(Socket* socket, StreamSinkEvent event, bool reset = true) {
diff --git a/rtc_base/virtual_socket_server.cc b/rtc_base/virtual_socket_server.cc
index e718f4c..c86aee9 100644
--- a/rtc_base/virtual_socket_server.cc
+++ b/rtc_base/virtual_socket_server.cc
@@ -457,7 +457,7 @@
safety->socket_.SignalReadEvent(&safety->socket_);
break;
case Signal::kConnectEvent:
- safety->socket_.SignalConnectEvent(&safety->socket_);
+ safety->socket_.NotifyConnectEvent(&safety->socket_);
break;
}
};
@@ -509,7 +509,7 @@
int error_to_signal = (socket->state_ == CS_CONNECTING) ? ECONNREFUSED : 0;
socket->state_ = CS_CLOSED;
socket->remote_addr_.Clear();
- socket->SignalCloseEvent(socket, error_to_signal);
+ socket->NotifyCloseEvent(socket, error_to_signal);
};
server_->msg_queue_->PostDelayedTask(std::move(task), delay);
}
@@ -787,7 +787,7 @@
return false;
}
// Signal the close event on the local connection first.
- socket->SignalCloseEvent(socket, 0);
+ socket->NotifyCloseEvent(socket, 0);
// Trigger the remote connection's close event.
socket->Close();