[Sheriff] Revert "Remove MessageHandler[AutoCleanup] dependency from StreamInterface."
This reverts commit eb79dd9ffdc41e4ca86803bfc1317e0961a8a8a6.
Reason for revert: breaks WebRTC roll into Chrome:
https://crrev.com/c/2445696
Sample failure:
https://ci.chromium.org/p/chromium/builders/try/linux-rel/506049
[ RUN ] PseudoTcpAdapterTest.DeleteOnConnected
Original change's description:
> Remove MessageHandler[AutoCleanup] dependency from StreamInterface.
>
> This includes relying on related types such as MessageData and
> PostEvent functionality inside the StreamInterface itself.
>
> This affects mostly tests but OpenSSLStreamAdapter
> requires special attention.
>
> Bug: webrtc:11988
> Change-Id: Ib5c895f1bdf77bb49e3162bd49718f8a98812d91
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/185505
> Commit-Queue: Tommi <tommi@webrtc.org>
> Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#32290}
TBR=kwiberg@webrtc.org,tommi@webrtc.org
Change-Id: I23d7a311a73c739eba872a21e6123235465c28cc
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: webrtc:11988
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/186564
Commit-Queue: Marina Ciocea <marinaciocea@webrtc.org>
Reviewed-by: Marina Ciocea <marinaciocea@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#32299}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 49008a2..77bff8d 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -835,7 +835,6 @@
"system:no_unique_address",
"system:rtc_export",
"task_utils:pending_task_safety_flag",
- "task_utils:repeating_task",
"task_utils:to_queued_task",
"third_party/base64",
"third_party/sigslot",
@@ -1426,7 +1425,6 @@
"memory:fifo_buffer",
"synchronization:mutex",
"synchronization:synchronization_unittests",
- "task_utils:pending_task_safety_flag",
"task_utils:to_queued_task",
"third_party/sigslot",
]
diff --git a/rtc_base/memory/BUILD.gn b/rtc_base/memory/BUILD.gn
index 838fbc6..5c3dd0a 100644
--- a/rtc_base/memory/BUILD.gn
+++ b/rtc_base/memory/BUILD.gn
@@ -20,15 +20,12 @@
deps = [ "..:checks" ]
}
-# Test only utility.
-# TODO: Tag with `testonly = true` once all depending targets are correctly
-# tagged.
rtc_library("fifo_buffer") {
visibility = [
- ":unittests",
+ "../../p2p:rtc_p2p",
"..:rtc_base_tests_utils",
"..:rtc_base_unittests",
- "../../p2p:rtc_p2p", # This needs to be fixed.
+ ":unittests",
]
sources = [
"fifo_buffer.cc",
@@ -37,8 +34,6 @@
deps = [
"..:rtc_base",
"../synchronization:mutex",
- "../task_utils:pending_task_safety_flag",
- "../task_utils:to_queued_task",
]
}
diff --git a/rtc_base/memory/fifo_buffer.cc b/rtc_base/memory/fifo_buffer.cc
index 3fbea8d..49e9267 100644
--- a/rtc_base/memory/fifo_buffer.cc
+++ b/rtc_base/memory/fifo_buffer.cc
@@ -104,7 +104,7 @@
// if we were full before, and now we're not, post an event
if (!was_writable && copy > 0) {
- PostEvent(SE_WRITE, 0);
+ PostEvent(owner_, SE_WRITE, 0);
}
}
return result;
@@ -129,7 +129,7 @@
// if we didn't have any data to read before, and now we do, post an event
if (!was_readable && copy > 0) {
- PostEvent(SE_READ, 0);
+ PostEvent(owner_, SE_READ, 0);
}
}
return result;
@@ -155,7 +155,7 @@
read_position_ = (read_position_ + size) % buffer_length_;
data_length_ -= size;
if (!was_writable && size > 0) {
- PostEvent(SE_WRITE, 0);
+ PostEvent(owner_, SE_WRITE, 0);
}
}
@@ -185,7 +185,7 @@
const bool was_readable = (data_length_ > 0);
data_length_ += size;
if (!was_readable && size > 0) {
- PostEvent(SE_READ, 0);
+ PostEvent(owner_, SE_READ, 0);
}
}
diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h
index bf2edf6..04c4cbf 100644
--- a/rtc_base/memory/fifo_buffer.h
+++ b/rtc_base/memory/fifo_buffer.h
@@ -15,8 +15,6 @@
#include "rtc_base/stream.h"
#include "rtc_base/synchronization/mutex.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
-#include "rtc_base/task_utils/to_queued_task.h"
namespace rtc {
@@ -100,12 +98,6 @@
bool GetWriteRemaining(size_t* size) const;
private:
- void PostEvent(int events, int err) {
- owner_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
- SignalEvent(this, events, err);
- }));
- }
-
// Helper method that implements ReadOffset. Caller must acquire a lock
// when calling this method.
StreamResult ReadOffsetLocked(void* buffer,
@@ -122,8 +114,6 @@
size_t* bytes_written)
RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
- webrtc::ScopedTaskSafety task_safety_;
-
// keeps the opened/closed state of the stream
StreamState state_ RTC_GUARDED_BY(mutex_);
// the allocated buffer
@@ -135,7 +125,7 @@
// offset to the readable data
size_t read_position_ RTC_GUARDED_BY(mutex_);
// stream callbacks are dispatched on this thread
- Thread* const owner_;
+ Thread* owner_;
// object lock
mutable webrtc::Mutex mutex_;
RTC_DISALLOW_COPY_AND_ASSIGN(FifoBuffer);
diff --git a/rtc_base/openssl_stream_adapter.cc b/rtc_base/openssl_stream_adapter.cc
index 5790b1b..175160c 100644
--- a/rtc_base/openssl_stream_adapter.cc
+++ b/rtc_base/openssl_stream_adapter.cc
@@ -35,7 +35,6 @@
#include "rtc_base/openssl_identity.h"
#include "rtc_base/ssl_certificate.h"
#include "rtc_base/stream.h"
-#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/thread.h"
#include "rtc_base/time_utils.h"
#include "system_wrappers/include/field_trial.h"
@@ -284,7 +283,6 @@
OpenSSLStreamAdapter::OpenSSLStreamAdapter(
std::unique_ptr<StreamInterface> stream)
: SSLStreamAdapter(std::move(stream)),
- owner_(rtc::Thread::Current()),
state_(SSL_NONE),
role_(SSL_CLIENT),
ssl_read_needs_write_(false),
@@ -298,7 +296,6 @@
support_legacy_tls_protocols_flag_(ShouldAllowLegacyTLSProtocols()) {}
OpenSSLStreamAdapter::~OpenSSLStreamAdapter() {
- timeout_task_.Stop();
Cleanup(0);
}
@@ -804,33 +801,6 @@
}
}
-void OpenSSLStreamAdapter::PostEvent(int events, int err) {
- owner_->PostTask(webrtc::ToQueuedTask(
- task_safety_, [this, events, err]() { SignalEvent(this, events, err); }));
-}
-
-void OpenSSLStreamAdapter::SetTimeout(int delay_ms) {
- // We need to accept 0 delay here as well as >0 delay, because
- // DTLSv1_get_timeout seems to frequently return 0 ms.
- RTC_DCHECK_GE(delay_ms, 0);
- RTC_DCHECK(!timeout_task_.Running());
-
- timeout_task_ = webrtc::RepeatingTaskHandle::DelayedStart(
- owner_, webrtc::TimeDelta::Millis(delay_ms),
- [flag = task_safety_.flag(), this]() {
- if (flag->alive()) {
- RTC_DLOG(LS_INFO) << "DTLS timeout expired";
- timeout_task_.Stop();
- DTLSv1_handle_timeout(ssl_);
- ContinueSSL();
- } else {
- RTC_NOTREACHED();
- }
- // This callback will never run again (stopped above).
- return webrtc::TimeDelta::PlusInfinity();
- });
-}
-
int OpenSSLStreamAdapter::BeginSSL() {
RTC_DCHECK(state_ == SSL_CONNECTING);
// The underlying stream has opened.
@@ -881,7 +851,7 @@
RTC_DCHECK(state_ == SSL_CONNECTING);
// Clear the DTLS timer
- timeout_task_.Stop();
+ Thread::Current()->Clear(this, MSG_TIMEOUT);
const int code = (role_ == SSL_CLIENT) ? SSL_connect(ssl_) : SSL_accept(ssl_);
const int ssl_error = SSL_get_error(ssl_, code);
@@ -913,7 +883,9 @@
struct timeval timeout;
if (DTLSv1_get_timeout(ssl_, &timeout)) {
int delay = timeout.tv_sec * 1000 + timeout.tv_usec / 1000;
- SetTimeout(delay);
+
+ Thread::Current()->PostDelayed(RTC_FROM_HERE, delay, this, MSG_TIMEOUT,
+ 0);
}
} break;
@@ -990,7 +962,18 @@
peer_cert_chain_.reset();
// Clear the DTLS timer
- timeout_task_.Stop();
+ Thread::Current()->Clear(this, MSG_TIMEOUT);
+}
+
+void OpenSSLStreamAdapter::OnMessage(Message* msg) {
+ // Process our own messages and then pass others to the superclass
+ if (MSG_TIMEOUT == msg->message_id) {
+ RTC_DLOG(LS_INFO) << "DTLS timeout expired";
+ DTLSv1_handle_timeout(ssl_);
+ ContinueSSL();
+ } else {
+ StreamInterface::OnMessage(msg);
+ }
}
SSL_CTX* OpenSSLStreamAdapter::SetupSSLContext() {
diff --git a/rtc_base/openssl_stream_adapter.h b/rtc_base/openssl_stream_adapter.h
index fbfccd684..d4cde84 100644
--- a/rtc_base/openssl_stream_adapter.h
+++ b/rtc_base/openssl_stream_adapter.h
@@ -26,8 +26,6 @@
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/stream.h"
#include "rtc_base/system/rtc_export.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
-#include "rtc_base/task_utils/repeating_task.h"
namespace rtc {
@@ -147,8 +145,7 @@
SSL_CLOSED // Clean close
};
- void PostEvent(int events, int err);
- void SetTimeout(int delay_ms);
+ enum { MSG_TIMEOUT = MSG_MAX + 1 };
// The following three methods return 0 on success and a negative
// error code on failure. The error code may be from OpenSSL or -1
@@ -172,6 +169,9 @@
void Error(const char* context, int err, uint8_t alert, bool signal);
void Cleanup(uint8_t alert);
+ // Override MessageHandler
+ void OnMessage(Message* msg) override;
+
// Flush the input buffers by reading left bytes (for DTLS)
void FlushInput(unsigned int left);
@@ -192,10 +192,6 @@
!peer_certificate_digest_value_.empty();
}
- rtc::Thread* const owner_;
- webrtc::ScopedTaskSafety task_safety_;
- webrtc::RepeatingTaskHandle timeout_task_;
-
SSLState state_;
SSLRole role_;
int ssl_error_code_; // valid when state_ == SSL_ERROR or SSL_CLOSED
diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc
index 1ba2f3e..bfbaf0f 100644
--- a/rtc_base/ssl_stream_adapter_unittest.cc
+++ b/rtc_base/ssl_stream_adapter_unittest.cc
@@ -26,8 +26,6 @@
#include "rtc_base/ssl_identity.h"
#include "rtc_base/ssl_stream_adapter.h"
#include "rtc_base/stream.h"
-#include "rtc_base/task_utils/pending_task_safety_flag.h"
-#include "rtc_base/task_utils/to_queued_task.h"
#include "test/field_trial.h"
using ::testing::Combine;
@@ -216,15 +214,7 @@
out_->Close();
}
- private:
- void PostEvent(int events, int err) {
- thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
- SignalEvent(this, events, err);
- }));
- }
-
- webrtc::ScopedTaskSafety task_safety_;
- rtc::Thread* const thread_ = rtc::Thread::Current();
+ protected:
SSLStreamAdapterTestBase* test_base_;
const std::string side_;
rtc::StreamInterface* in_;
@@ -286,17 +276,10 @@
protected:
void NotifyReadableForTest() { PostEvent(rtc::SE_READ, 0); }
+
void NotifyWritableForTest() { PostEvent(rtc::SE_WRITE, 0); }
private:
- void PostEvent(int events, int err) {
- thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
- SignalEvent(this, events, err);
- }));
- }
-
- rtc::Thread* const thread_ = rtc::Thread::Current();
- webrtc::ScopedTaskSafety task_safety_;
rtc::BufferQueue buffer_;
};
diff --git a/rtc_base/stream.cc b/rtc_base/stream.cc
index ee72f8d..1b0a4d7 100644
--- a/rtc_base/stream.cc
+++ b/rtc_base/stream.cc
@@ -24,6 +24,7 @@
///////////////////////////////////////////////////////////////////////////////
// StreamInterface
///////////////////////////////////////////////////////////////////////////////
+StreamInterface::~StreamInterface() {}
StreamResult StreamInterface::WriteAll(const void* data,
size_t data_len,
@@ -43,12 +44,29 @@
return result;
}
+void StreamInterface::PostEvent(Thread* t, int events, int err) {
+ t->Post(RTC_FROM_HERE, this, MSG_POST_EVENT,
+ new StreamEventData(events, err));
+}
+
+void StreamInterface::PostEvent(int events, int err) {
+ PostEvent(Thread::Current(), events, err);
+}
+
bool StreamInterface::Flush() {
return false;
}
StreamInterface::StreamInterface() {}
+void StreamInterface::OnMessage(Message* msg) {
+ if (MSG_POST_EVENT == msg->message_id) {
+ StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
+ SignalEvent(this, pe->events, pe->error);
+ delete msg->pdata;
+ }
+}
+
///////////////////////////////////////////////////////////////////////////////
// StreamAdapterInterface
///////////////////////////////////////////////////////////////////////////////
diff --git a/rtc_base/stream.h b/rtc_base/stream.h
index 9bf11a2..940bfb4 100644
--- a/rtc_base/stream.h
+++ b/rtc_base/stream.h
@@ -48,9 +48,16 @@
// SE_WRITE: Data can be written, so Write is likely to not return SR_BLOCK
enum StreamEvent { SE_OPEN = 1, SE_READ = 2, SE_WRITE = 4, SE_CLOSE = 8 };
-class RTC_EXPORT StreamInterface {
+struct StreamEventData : public MessageData {
+ int events, error;
+ StreamEventData(int ev, int er) : events(ev), error(er) {}
+};
+
+class RTC_EXPORT StreamInterface : public MessageHandlerAutoCleanup {
public:
- virtual ~StreamInterface() {}
+ enum { MSG_POST_EVENT = 0xF1F1, MSG_MAX = MSG_POST_EVENT };
+
+ ~StreamInterface() override;
virtual StreamState GetState() const = 0;
@@ -89,6 +96,13 @@
// certain events will be raised in the future.
sigslot::signal3<StreamInterface*, int, int> SignalEvent;
+ // Like calling SignalEvent, but posts a message to the specified thread,
+ // which will call SignalEvent. This helps unroll the stack and prevent
+ // re-entrancy.
+ void PostEvent(Thread* t, int events, int err);
+ // Like the aforementioned method, but posts to the current thread.
+ void PostEvent(int events, int err);
+
// Return true if flush is successful.
virtual bool Flush();
@@ -111,6 +125,9 @@
protected:
StreamInterface();
+ // MessageHandler Interface
+ void OnMessage(Message* msg) override;
+
private:
RTC_DISALLOW_COPY_AND_ASSIGN(StreamInterface);
};