Remove MessageHandler[AutoCleanup] dependency from StreamInterface.
This includes relying on related types such as MessageData and
PostEvent functionality inside the StreamInterface itself.
This affects mostly tests but OpenSSLStreamAdapter
requires special attention.
Bug: webrtc:11988
Change-Id: Ib5c895f1bdf77bb49e3162bd49718f8a98812d91
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/185505
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32290}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 77bff8d..49008a2 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -835,6 +835,7 @@
"system:no_unique_address",
"system:rtc_export",
"task_utils:pending_task_safety_flag",
+ "task_utils:repeating_task",
"task_utils:to_queued_task",
"third_party/base64",
"third_party/sigslot",
@@ -1425,6 +1426,7 @@
"memory:fifo_buffer",
"synchronization:mutex",
"synchronization:synchronization_unittests",
+ "task_utils:pending_task_safety_flag",
"task_utils:to_queued_task",
"third_party/sigslot",
]
diff --git a/rtc_base/memory/BUILD.gn b/rtc_base/memory/BUILD.gn
index 5c3dd0a..838fbc6 100644
--- a/rtc_base/memory/BUILD.gn
+++ b/rtc_base/memory/BUILD.gn
@@ -20,12 +20,15 @@
deps = [ "..:checks" ]
}
+# Test only utility.
+# TODO: Tag with `testonly = true` once all depending targets are correctly
+# tagged.
rtc_library("fifo_buffer") {
visibility = [
- "../../p2p:rtc_p2p",
+ ":unittests",
"..:rtc_base_tests_utils",
"..:rtc_base_unittests",
- ":unittests",
+ "../../p2p:rtc_p2p", # This needs to be fixed.
]
sources = [
"fifo_buffer.cc",
@@ -34,6 +37,8 @@
deps = [
"..:rtc_base",
"../synchronization:mutex",
+ "../task_utils:pending_task_safety_flag",
+ "../task_utils:to_queued_task",
]
}
diff --git a/rtc_base/memory/fifo_buffer.cc b/rtc_base/memory/fifo_buffer.cc
index 49e9267..3fbea8d 100644
--- a/rtc_base/memory/fifo_buffer.cc
+++ b/rtc_base/memory/fifo_buffer.cc
@@ -104,7 +104,7 @@
// if we were full before, and now we're not, post an event
if (!was_writable && copy > 0) {
- PostEvent(owner_, SE_WRITE, 0);
+ PostEvent(SE_WRITE, 0);
}
}
return result;
@@ -129,7 +129,7 @@
// if we didn't have any data to read before, and now we do, post an event
if (!was_readable && copy > 0) {
- PostEvent(owner_, SE_READ, 0);
+ PostEvent(SE_READ, 0);
}
}
return result;
@@ -155,7 +155,7 @@
read_position_ = (read_position_ + size) % buffer_length_;
data_length_ -= size;
if (!was_writable && size > 0) {
- PostEvent(owner_, SE_WRITE, 0);
+ PostEvent(SE_WRITE, 0);
}
}
@@ -185,7 +185,7 @@
const bool was_readable = (data_length_ > 0);
data_length_ += size;
if (!was_readable && size > 0) {
- PostEvent(owner_, SE_READ, 0);
+ PostEvent(SE_READ, 0);
}
}
diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h
index 04c4cbf..bf2edf6 100644
--- a/rtc_base/memory/fifo_buffer.h
+++ b/rtc_base/memory/fifo_buffer.h
@@ -15,6 +15,8 @@
#include "rtc_base/stream.h"
#include "rtc_base/synchronization/mutex.h"
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
+#include "rtc_base/task_utils/to_queued_task.h"
namespace rtc {
@@ -98,6 +100,12 @@
bool GetWriteRemaining(size_t* size) const;
private:
+ void PostEvent(int events, int err) {
+ owner_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
+ SignalEvent(this, events, err);
+ }));
+ }
+
// Helper method that implements ReadOffset. Caller must acquire a lock
// when calling this method.
StreamResult ReadOffsetLocked(void* buffer,
@@ -114,6 +122,8 @@
size_t* bytes_written)
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ webrtc::ScopedTaskSafety task_safety_;
+
// keeps the opened/closed state of the stream
StreamState state_ RTC_GUARDED_BY(mutex_);
// the allocated buffer
@@ -125,7 +135,7 @@
// offset to the readable data
size_t read_position_ RTC_GUARDED_BY(mutex_);
// stream callbacks are dispatched on this thread
- Thread* owner_;
+ Thread* const owner_;
// object lock
mutable webrtc::Mutex mutex_;
RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer);
diff --git a/rtc_base/openssl_stream_adapter.cc b/rtc_base/openssl_stream_adapter.cc
index 0f8c1fc..426100b 100644
--- a/rtc_base/openssl_stream_adapter.cc
+++ b/rtc_base/openssl_stream_adapter.cc
@@ -35,6 +35,7 @@
#include "rtc_base/openssl_identity.h"
#include "rtc_base/ssl_certificate.h"
#include "rtc_base/stream.h"
+#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/thread.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/field_trial.h"
@@ -51,7 +52,6 @@
namespace rtc {
namespace {
-
// SRTP cipher suite table. |internal_name| is used to construct a
// colon-separated profile strings which is needed by
// SSL_CTX_set_tlsext_use_srtp().
@@ -284,6 +284,7 @@
OpenSSLStreamAdapter::OpenSSLStreamAdapter(
std::unique_ptr<StreamInterface> stream)
: SSLStreamAdapter(std::move(stream)),
+ owner_(rtc::Thread::Current()),
state_(SSL_NONE),
role_(SSL_CLIENT),
ssl_read_needs_write_(false),
@@ -297,6 +298,7 @@
support_legacy_tls_protocols_flag_(ShouldAllowLegacyTLSProtocols()) {}
OpenSSLStreamAdapter::~OpenSSLStreamAdapter() {
+ timeout_task_.Stop();
Cleanup(0);
}
@@ -802,6 +804,33 @@
}
}
+void OpenSSLStreamAdapter::PostEvent(int events, int err) {
+ owner_->PostTask(webrtc::ToQueuedTask(
+ task_safety_, [this, events, err]() { SignalEvent(this, events, err); }));
+}
+
+void OpenSSLStreamAdapter::SetTimeout(int delay_ms) {
+ // We need to accept 0 delay here as well as >0 delay, because
+ // DTLSv1_get_timeout seems to frequently return 0 ms.
+ RTC_DCHECK_GE(delay_ms, 0);
+ RTC_DCHECK(!timeout_task_.Running());
+
+ timeout_task_ = webrtc::RepeatingTaskHandle::DelayedStart(
+ owner_, webrtc::TimeDelta::Millis(delay_ms),
+ [flag = task_safety_.flag(), this]() {
+ if (flag->alive()) {
+ RTC_LOG(LS_INFO) << "DTLS timeout expired";
+ timeout_task_.Stop();
+ DTLSv1_handle_timeout(ssl_);
+ ContinueSSL();
+ } else {
+ RTC_NOTREACHED();
+ }
+ // This callback will never run again (stopped above).
+ return webrtc::TimeDelta::PlusInfinity();
+ });
+}
+
int OpenSSLStreamAdapter::BeginSSL() {
RTC_DCHECK(state_ == SSL_CONNECTING);
// The underlying stream has opened.
@@ -852,7 +881,7 @@
RTC_DCHECK(state_ == SSL_CONNECTING);
// Clear the DTLS timer
- Thread::Current()->Clear(this, MSG_TIMEOUT);
+ timeout_task_.Stop();
const int code = (role_ == SSL_CLIENT) ? SSL_connect(ssl_) : SSL_accept(ssl_);
const int ssl_error = SSL_get_error(ssl_, code);
@@ -884,9 +913,7 @@
struct timeval timeout;
if (DTLSv1_get_timeout(ssl_, &timeout)) {
int delay = timeout.tv_sec * 1000 + timeout.tv_usec / 1000;
-
- Thread::Current()->PostDelayed(RTC_FROM_HERE, delay, this, MSG_TIMEOUT,
- 0);
+ SetTimeout(delay);
}
} break;
@@ -963,18 +990,7 @@
peer_cert_chain_.reset();
// Clear the DTLS timer
- Thread::Current()->Clear(this, MSG_TIMEOUT);
-}
-
-void OpenSSLStreamAdapter::OnMessage(Message* msg) {
- // Process our own messages and then pass others to the superclass
- if (MSG_TIMEOUT == msg->message_id) {
- RTC_LOG(LS_INFO) << "DTLS timeout expired";
- DTLSv1_handle_timeout(ssl_);
- ContinueSSL();
- } else {
- StreamInterface::OnMessage(msg);
- }
+ timeout_task_.Stop();
}
SSL_CTX* OpenSSLStreamAdapter::SetupSSLContext() {
diff --git a/rtc_base/openssl_stream_adapter.h b/rtc_base/openssl_stream_adapter.h
index d4cde84..fbfccd684 100644
--- a/rtc_base/openssl_stream_adapter.h
+++ b/rtc_base/openssl_stream_adapter.h
@@ -26,6 +26,8 @@
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/stream.h"
#include "rtc_base/system/rtc_export.h"
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
+#include "rtc_base/task_utils/repeating_task.h"
namespace rtc {
@@ -145,7 +147,8 @@
SSL_CLOSED // Clean close
};
- enum { MSG_TIMEOUT = MSG_MAX + 1 };
+ void PostEvent(int events, int err);
+ void SetTimeout(int delay_ms);
// The following three methods return 0 on success and a negative
// error code on failure. The error code may be from OpenSSL or -1
@@ -169,9 +172,6 @@
void Error(const char* context, int err, uint8_t alert, bool signal);
void Cleanup(uint8_t alert);
- // Override MessageHandler
- void OnMessage(Message* msg) override;
-
// Flush the input buffers by reading left bytes (for DTLS)
void FlushInput(unsigned int left);
@@ -192,6 +192,10 @@
!peer_certificate_digest_value_.empty();
}
+ rtc::Thread* const owner_;
+ webrtc::ScopedTaskSafety task_safety_;
+ webrtc::RepeatingTaskHandle timeout_task_;
+
SSLState state_;
SSLRole role_;
int ssl_error_code_; // valid when state_ == SSL_ERROR or SSL_CLOSED
diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc
index bfbaf0f..1ba2f3e 100644
--- a/rtc_base/ssl_stream_adapter_unittest.cc
+++ b/rtc_base/ssl_stream_adapter_unittest.cc
@@ -26,6 +26,8 @@
#include "rtc_base/ssl_identity.h"
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/stream.h"
+#include "rtc_base/task_utils/pending_task_safety_flag.h"
+#include "rtc_base/task_utils/to_queued_task.h"
#include "test/field_trial.h"
using ::testing::Combine;
@@ -214,7 +216,15 @@
out_->Close();
}
- protected:
+ private:
+ void PostEvent(int events, int err) {
+ thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
+ SignalEvent(this, events, err);
+ }));
+ }
+
+ webrtc::ScopedTaskSafety task_safety_;
+ rtc::Thread* const thread_ = rtc::Thread::Current();
SSLStreamAdapterTestBase* test_base_;
const std::string side_;
rtc::StreamInterface* in_;
@@ -276,10 +286,17 @@
protected:
void NotifyReadableForTest() { PostEvent(rtc::SE_READ, 0); }
-
void NotifyWritableForTest() { PostEvent(rtc::SE_WRITE, 0); }
private:
+ void PostEvent(int events, int err) {
+ thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
+ SignalEvent(this, events, err);
+ }));
+ }
+
+ rtc::Thread* const thread_ = rtc::Thread::Current();
+ webrtc::ScopedTaskSafety task_safety_;
rtc::BufferQueue buffer_;
};
diff --git a/rtc_base/stream.cc b/rtc_base/stream.cc
index 1b0a4d7..ee72f8d 100644
--- a/rtc_base/stream.cc
+++ b/rtc_base/stream.cc
@@ -24,7 +24,6 @@
///////////////////////////////////////////////////////////////////////////////
// StreamInterface
///////////////////////////////////////////////////////////////////////////////
-StreamInterface::~StreamInterface() {}
StreamResult StreamInterface::WriteAll(const void* data,
size_t data_len,
@@ -44,29 +43,12 @@
return result;
}
-void StreamInterface::PostEvent(Thread* t, int events, int err) {
- t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
- new StreamEventData(events, err));
-}
-
-void StreamInterface::PostEvent(int events, int err) {
- PostEvent(Thread::Current(), events, err);
-}
-
bool StreamInterface::Flush() {
return false;
}
StreamInterface::StreamInterface() {}
-void StreamInterface::OnMessage(Message* msg) {
- if (MSG_POST_EVENT == msg->message_id) {
- StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
- SignalEvent(this, pe->events, pe->error);
- delete msg->pdata;
- }
-}
-
///////////////////////////////////////////////////////////////////////////////
// StreamAdapterInterface
///////////////////////////////////////////////////////////////////////////////
diff --git a/rtc_base/stream.h b/rtc_base/stream.h
index 940bfb4..9bf11a2 100644
--- a/rtc_base/stream.h
+++ b/rtc_base/stream.h
@@ -48,16 +48,9 @@
// SE_WRITE: Data can be written, so Write is likely to not return SR_BLOCK
enum StreamEvent { SE_OPEN = 1, SE_READ = 2, SE_WRITE = 4, SE_CLOSE = 8 };
-struct StreamEventData : public MessageData {
- int events, error;
- StreamEventData(int ev, int er) : events(ev), error(er) {}
-};
-
-class RTC_EXPORT StreamInterface : public MessageHandlerAutoCleanup {
+class RTC_EXPORT StreamInterface {
public:
- enum { MSG_POST_EVENT = 0xF1F1, MSG_MAX = MSG_POST_EVENT };
-
- ~StreamInterface() override;
+ virtual ~StreamInterface() {}
virtual StreamState GetState() const = 0;
@@ -96,13 +89,6 @@
// certain events will be raised in the future.
sigslot::signal3<StreamInterface*, int, int> SignalEvent;
- // Like calling SignalEvent, but posts a message to the specified thread,
- // which will call SignalEvent. This helps unroll the stack and prevent
- // re-entrancy.
- void PostEvent(Thread* t, int events, int err);
- // Like the aforementioned method, but posts to the current thread.
- void PostEvent(int events, int err);
-
// Return true if flush is successful.
virtual bool Flush();
@@ -125,9 +111,6 @@
protected:
StreamInterface();
- // MessageHandler Interface
- void OnMessage(Message* msg) override;
-
private:
RTC_DISALLOW_COPY_AND_ASSIGN(StreamInterface);
};