Introduce StreamInterface::FireEvent for firing stream events
This is a step towards removing StreamInterface::SignalEvent.
Downstream dependency will need to be updated to call FireEvent()
before further changes can land in webrtc.
Bug: webrtc:11943
Change-Id: Ia7d3f1c43fda52b7cf5bfa082aef3f462553cd67
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/347884
Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#42143}
diff --git a/p2p/base/dtls_transport.cc b/p2p/base/dtls_transport.cc
index 42353e2..f6f6847 100644
--- a/p2p/base/dtls_transport.cc
+++ b/p2p/base/dtls_transport.cc
@@ -79,7 +79,7 @@
rtc::StreamResult StreamInterfaceChannel::Read(rtc::ArrayView<uint8_t> buffer,
size_t& read,
int& error) {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
if (state_ == rtc::SS_CLOSED)
return rtc::SR_EOS;
@@ -97,7 +97,7 @@
rtc::ArrayView<const uint8_t> data,
size_t& written,
int& error) {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
// Always succeeds, since this is an unreliable transport anyway.
// TODO(zhihuang): Should this block if ice_transport_'s temporarily
// unwritable?
@@ -109,7 +109,7 @@
}
bool StreamInterfaceChannel::OnPacketReceived(const char* data, size_t size) {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
if (packets_.size() > 0) {
RTC_LOG(LS_WARNING) << "Packet already in queue.";
}
@@ -121,17 +121,17 @@
// packet currently in packets_.
RTC_LOG(LS_ERROR) << "Failed to write packet to queue.";
}
- SignalEvent(this, rtc::SE_READ, 0);
+ FireEvent(rtc::SE_READ, 0);
return ret;
}
rtc::StreamState StreamInterfaceChannel::GetState() const {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
return state_;
}
void StreamInterfaceChannel::Close() {
- RTC_DCHECK_RUN_ON(&sequence_checker_);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
packets_.Clear();
state_ = rtc::SS_CLOSED;
}
diff --git a/p2p/base/dtls_transport.h b/p2p/base/dtls_transport.h
index f479325..109dbf5 100644
--- a/p2p/base/dtls_transport.h
+++ b/p2p/base/dtls_transport.h
@@ -58,10 +58,9 @@
int& error) override;
private:
- RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker sequence_checker_;
IceTransportInternal* const ice_transport_; // owned by DtlsTransport
- rtc::StreamState state_ RTC_GUARDED_BY(sequence_checker_);
- rtc::BufferQueue packets_ RTC_GUARDED_BY(sequence_checker_);
+ rtc::StreamState state_ RTC_GUARDED_BY(callback_sequence_);
+ rtc::BufferQueue packets_ RTC_GUARDED_BY(callback_sequence_);
};
// This class provides a DTLS SSLStreamAdapter inside a TransportChannel-style
@@ -235,7 +234,7 @@
// Sets the DTLS state, signaling if necessary.
void set_dtls_state(webrtc::DtlsTransportState state);
- webrtc::SequenceChecker thread_checker_;
+ RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker thread_checker_;
const int component_;
webrtc::DtlsTransportState dtls_state_ = webrtc::DtlsTransportState::kNew;
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index c582863..69b4343 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -1446,8 +1446,11 @@
deps = [
":buffer",
":checks",
+ ":logging",
":threading",
"../api:array_view",
+ "../api:sequence_checker",
+ "system:no_unique_address",
"system:rtc_export",
"third_party/sigslot",
]
diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h
index a225c68..658a3c7 100644
--- a/rtc_base/memory/fifo_buffer.h
+++ b/rtc_base/memory/fifo_buffer.h
@@ -78,9 +78,11 @@
private:
void PostEvent(int events, int err) {
- owner_->PostTask(webrtc::SafeTask(
- task_safety_.flag(),
- [this, events, err]() { SignalEvent(this, events, err); }));
+ owner_->PostTask(
+ webrtc::SafeTask(task_safety_.flag(), [this, events, err]() {
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
+ FireEvent(events, err);
+ }));
}
// Helper method that implements Read. Caller must acquire a lock
diff --git a/rtc_base/openssl_stream_adapter.cc b/rtc_base/openssl_stream_adapter.cc
index e2c242b..357510b 100644
--- a/rtc_base/openssl_stream_adapter.cc
+++ b/rtc_base/openssl_stream_adapter.cc
@@ -744,6 +744,7 @@
void OpenSSLStreamAdapter::OnEvent(StreamInterface* stream,
int events,
int err) {
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
int events_to_signal = 0;
int signal_error = 0;
RTC_DCHECK(stream == stream_.get());
@@ -800,13 +801,14 @@
if (events_to_signal) {
// Note that the adapter presents itself as the origin of the stream events,
// since users of the adapter may not recognize the adapted object.
- SignalEvent(this, events_to_signal, signal_error);
+ FireEvent(events_to_signal, signal_error);
}
}
void OpenSSLStreamAdapter::PostEvent(int events, int err) {
owner_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
- SignalEvent(this, events, err);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
+ FireEvent(events, err);
}));
}
@@ -885,8 +887,9 @@
}
int OpenSSLStreamAdapter::ContinueSSL() {
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DLOG(LS_VERBOSE) << "ContinueSSL";
- RTC_DCHECK(state_ == SSL_CONNECTING);
+ RTC_DCHECK_EQ(state_, SSL_CONNECTING);
// Clear the DTLS timer
timeout_task_.Stop();
@@ -911,7 +914,7 @@
// The caller of ContinueSSL may be the same object listening for these
// events and may not be prepared for reentrancy.
// PostEvent(SE_OPEN | SE_READ | SE_WRITE, 0);
- SignalEvent(this, SE_OPEN | SE_READ | SE_WRITE, 0);
+ FireEvent(SE_OPEN | SE_READ | SE_WRITE, 0);
}
break;
@@ -950,13 +953,14 @@
int err,
uint8_t alert,
bool signal) {
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_LOG(LS_WARNING) << "OpenSSLStreamAdapter::Error(" << context << ", "
<< err << ", " << static_cast<int>(alert) << ")";
state_ = SSL_ERROR;
ssl_error_code_ = err;
Cleanup(alert);
if (signal) {
- SignalEvent(this, SE_CLOSE, err);
+ FireEvent(SE_CLOSE, err);
}
}
diff --git a/rtc_base/ssl_adapter_unittest.cc b/rtc_base/ssl_adapter_unittest.cc
index d0aedb8..084594f 100644
--- a/rtc_base/ssl_adapter_unittest.cc
+++ b/rtc_base/ssl_adapter_unittest.cc
@@ -216,21 +216,25 @@
private:
void OnConnectEvent(rtc::Socket* socket) {
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DCHECK_EQ(socket, socket_.get());
- SignalEvent(this, rtc::SE_OPEN | rtc::SE_READ | rtc::SE_WRITE, 0);
+ FireEvent(rtc::SE_OPEN | rtc::SE_READ | rtc::SE_WRITE, 0);
}
void OnReadEvent(rtc::Socket* socket) {
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DCHECK_EQ(socket, socket_.get());
- SignalEvent(this, rtc::SE_READ, 0);
+ FireEvent(rtc::SE_READ, 0);
}
void OnWriteEvent(rtc::Socket* socket) {
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DCHECK_EQ(socket, socket_.get());
- SignalEvent(this, rtc::SE_WRITE, 0);
+ FireEvent(rtc::SE_WRITE, 0);
}
void OnCloseEvent(rtc::Socket* socket, int err) {
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
RTC_DCHECK_EQ(socket, socket_.get());
- SignalEvent(this, rtc::SE_CLOSE, err);
+ FireEvent(rtc::SE_CLOSE, err);
}
std::unique_ptr<rtc::Socket> socket_;
diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc
index 3389218..fc6532c 100644
--- a/rtc_base/ssl_stream_adapter_unittest.cc
+++ b/rtc_base/ssl_stream_adapter_unittest.cc
@@ -223,7 +223,8 @@
private:
void PostEvent(int events, int err) {
thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
- SignalEvent(this, events, err);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
+ FireEvent(events, err);
}));
}
@@ -293,7 +294,8 @@
private:
void PostEvent(int events, int err) {
thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
- SignalEvent(this, events, err);
+ RTC_DCHECK_RUN_ON(&callback_sequence_);
+ FireEvent(events, err);
}));
}
diff --git a/rtc_base/stream.h b/rtc_base/stream.h
index e02349a..4b2236a 100644
--- a/rtc_base/stream.h
+++ b/rtc_base/stream.h
@@ -14,7 +14,9 @@
#include <memory>
#include "api/array_view.h"
+#include "api/sequence_checker.h"
#include "rtc_base/buffer.h"
+#include "rtc_base/system/no_unique_address.h"
#include "rtc_base/system/rtc_export.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread.h"
@@ -121,6 +123,16 @@
protected:
StreamInterface();
+
+ // Utility function for derived classes.
+ void FireEvent(int stream_events, int err) RTC_RUN_ON(&callback_sequence_) {
+ // TODO(tommi): This is for backwards compatibility only while `SignalEvent`
+ // is being replaced by `SetEventHandler`.
+ SignalEvent(this, stream_events, err);
+ }
+
+ RTC_NO_UNIQUE_ADDRESS webrtc::SequenceChecker callback_sequence_{
+ webrtc::SequenceChecker::kDetached};
};
} // namespace rtc