in rtc::Thread introduce Invoke without rtc::Location parameter
To reduce usage of rtc::MessageHandler, hide rtc::Thread::Send into private section with intention to deprecate it in favor of the new Invoke function.
Bug: webrtc:9702, webrtc:11318
Change-Id: Ib4c26f9abc361e05a45b2a91929af58ab160b3f0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/274166
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38036}
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index 1a43008..9c36749 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -941,10 +941,8 @@
}
}
-void Thread::InvokeInternal(const Location& posted_from,
- rtc::FunctionView<void()> functor) {
- TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
- "src_func", posted_from.function_name());
+void Thread::BlockingCall(rtc::FunctionView<void()> functor) {
+ TRACE_EVENT0("webrtc", "Thread::BlockingCall");
class FunctorMessageHandler : public MessageHandler {
public:
@@ -956,7 +954,7 @@
rtc::FunctionView<void()> functor_;
} handler(functor);
- Send(posted_from, &handler);
+ Send(/*posted_from=*/{}, &handler, /*id=*/0, /*pdata=*/nullptr);
}
// Called by the ThreadManager when being set as the current thread.
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index 52df554..ad286a8 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -20,6 +20,7 @@
#include <set>
#include <string>
#include <type_traits>
+#include <utility>
#include <vector>
#include "absl/strings/string_view.h"
@@ -47,8 +48,8 @@
#endif
#if RTC_DCHECK_IS_ON
-// Counts how many blocking Thread::Invoke or Thread::Send calls are made from
-// within a scope and logs the number of blocking calls at the end of the scope.
+// Counts how many `Thread::BlockingCall` are made from within a scope and logs
+// the number of blocking calls at the end of the scope.
#define RTC_LOG_THREAD_BLOCK_COUNT() \
rtc::Thread::ScopedCountBlockingCalls blocked_call_count_printer( \
[func = __func__](uint32_t actual_block, uint32_t could_block) { \
@@ -202,8 +203,8 @@
static std::unique_ptr<Thread> Create();
static Thread* Current();
- // Used to catch performance regressions. Use this to disallow blocking calls
- // (Invoke) for a given scope. If a synchronous call is made while this is in
+ // Used to catch performance regressions. Use this to disallow BlockingCall
+ // for a given scope. If a synchronous call is made while this is in
// effect, an assert will be triggered.
// Note that this is a single threaded class.
class ScopedDisallowBlockingCalls {
@@ -310,7 +311,7 @@
bool SetName(absl::string_view name, const void* obj);
// Sets the expected processing time in ms. The thread will write
- // log messages when Invoke() takes more time than this.
+ // log messages when Dispatch() takes more time than this.
// Default is 50 ms.
void SetDispatchWarningMs(int deadline);
@@ -328,41 +329,34 @@
// ProcessMessages occasionally.
virtual void Run();
- virtual void Send(const Location& posted_from,
- MessageHandler* phandler,
- uint32_t id = 0,
- MessageData* pdata = nullptr);
-
- // Convenience method to invoke a functor on another thread. Caller must
- // provide the `ReturnT` template argument, which cannot (easily) be deduced.
- // Uses Send() internally, which blocks the current thread until execution
- // is complete.
- // Ex: bool result = thread.Invoke<bool>(RTC_FROM_HERE,
- // &MyFunctionReturningBool);
+ // Convenience method to invoke a functor on another thread.
+ // Blocks the current thread until execution is complete.
+ // Ex: thread.BlockingCall([&] { result = MyFunctionReturningBool(); });
// NOTE: This function can only be called when synchronous calls are allowed.
// See ScopedDisallowBlockingCalls for details.
- // NOTE: Blocking invokes are DISCOURAGED, consider if what you're doing can
+ // NOTE: Blocking calls are DISCOURAGED, consider if what you're doing can
// be achieved with PostTask() and callbacks instead.
- template <
- class ReturnT,
- typename = typename std::enable_if<!std::is_void<ReturnT>::value>::type>
- ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
+ virtual void BlockingCall(FunctionView<void()> functor);
+
+ template <typename Functor,
+ typename ReturnT = std::invoke_result_t<Functor>,
+ typename = typename std::enable_if_t<!std::is_void_v<ReturnT>>>
+ ReturnT BlockingCall(Functor&& functor) {
ReturnT result;
- InvokeInternal(posted_from, [functor, &result] { result = functor(); });
+ BlockingCall([&] { result = std::forward<Functor>(functor)(); });
return result;
}
- template <
- class ReturnT,
- typename = typename std::enable_if<std::is_void<ReturnT>::value>::type>
- void Invoke(const Location& posted_from, FunctionView<void()> functor) {
- InvokeInternal(posted_from, functor);
+ // Deprecated, use `BlockingCall` instead.
+ template <typename ReturnT>
+ ReturnT Invoke(const Location& posted_from, FunctionView<ReturnT()> functor) {
+ return BlockingCall(functor);
}
- // Allows invoke to specified `thread`. Thread never will be dereferenced and
- // will be used only for reference-based comparison, so instance can be safely
- // deleted. If NDEBUG is defined and RTC_DCHECK_IS_ON is undefined do
- // nothing.
+ // Allows BlockingCall to specified `thread`. Thread never will be
+ // dereferenced and will be used only for reference-based comparison, so
+ // instance can be safely deleted. If NDEBUG is defined and RTC_DCHECK_IS_ON
+ // is undefined do nothing.
void AllowInvokesToThread(Thread* thread);
// If NDEBUG is defined and RTC_DCHECK_IS_ON is undefined do nothing.
@@ -503,12 +497,13 @@
private:
static const int kSlowDispatchLoggingThreshold = 50; // 50 ms
- // TODO(bugs.webrtc.org/9702): Delete when chromium stops overriding it.
- // chromium's ThreadWrapper overrides it just to check it is never called.
- virtual bool Peek(Message* pmsg, int cms_wait) {
- RTC_DCHECK_NOTREACHED();
- return false;
- }
+ // TODO(bugs.webrtc.org/9702): Delete and move Send's implementation into
+ // `BlockingCall` when derived classes override `BlockingCall` instead.
+ virtual void Send(const Location& posted_from,
+ MessageHandler* phandler,
+ uint32_t id,
+ MessageData* pdata);
+
// Get() will process I/O until:
// 1) A message is available (returns true)
// 2) cmsWait seconds have elapsed (returns false)
@@ -539,9 +534,6 @@
// Return true if the thread is currently running.
bool IsRunning();
- void InvokeInternal(const Location& posted_from,
- rtc::FunctionView<void()> functor);
-
// Called by the ThreadManager when being set as the current thread.
void EnsureIsCurrentTaskQueue();
diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc
index a888534..778b89a 100644
--- a/rtc_base/thread_unittest.cc
+++ b/rtc_base/thread_unittest.cc
@@ -143,77 +143,6 @@
Event* event_;
};
-// A bool wrapped in a mutex, to avoid data races. Using a volatile
-// bool should be sufficient for correct code ("eventual consistency"
-// between caches is sufficient), but we can't tell the compiler about
-// that, and then tsan complains about a data race.
-
-// See also discussion at
-// http://stackoverflow.com/questions/7223164/is-mutex-needed-to-synchronize-a-simple-flag-between-pthreads
-
-// Using std::atomic<bool> or std::atomic_flag in C++11 is probably
-// the right thing to do, but those features are not yet allowed. Or
-// rtc::AtomicInt, if/when that is added. Since the use isn't
-// performance critical, use a plain critical section for the time
-// being.
-
-class AtomicBool {
- public:
- explicit AtomicBool(bool value = false) : flag_(value) {}
- AtomicBool& operator=(bool value) {
- webrtc::MutexLock scoped_lock(&mutex_);
- flag_ = value;
- return *this;
- }
- bool get() const {
- webrtc::MutexLock scoped_lock(&mutex_);
- return flag_;
- }
-
- private:
- mutable webrtc::Mutex mutex_;
- bool flag_;
-};
-
-// Function objects to test Thread::Invoke.
-struct FunctorA {
- int operator()() { return 42; }
-};
-class FunctorB {
- public:
- explicit FunctorB(AtomicBool* flag) : flag_(flag) {}
- void operator()() {
- if (flag_)
- *flag_ = true;
- }
-
- private:
- AtomicBool* flag_;
-};
-struct FunctorC {
- int operator()() {
- Thread::Current()->ProcessMessages(50);
- return 24;
- }
-};
-struct FunctorD {
- public:
- explicit FunctorD(AtomicBool* flag) : flag_(flag) {}
- FunctorD(FunctorD&&) = default;
-
- FunctorD(const FunctorD&) = delete;
- FunctorD& operator=(const FunctorD&) = delete;
-
- FunctorD& operator=(FunctorD&&) = default;
- void operator()() {
- if (flag_)
- *flag_ = true;
- }
-
- private:
- AtomicBool* flag_;
-};
-
// See: https://code.google.com/p/webrtc/issues/detail?id=2409
TEST(ThreadTest, DISABLED_Main) {
rtc::AutoThread main_thread;
@@ -276,9 +205,9 @@
// Test invoking on the current thread. This should not count as an 'actual'
// invoke, but should still count as an invoke that could block since we
- // that the call to Invoke serves a purpose in some configurations (and should
- // not be used a general way to call methods on the same thread).
- current.Invoke<void>(RTC_FROM_HERE, []() {});
+ // that the call to `BlockingCall` serves a purpose in some configurations
+ // (and should not be used a general way to call methods on the same thread).
+ current.BlockingCall([]() {});
EXPECT_EQ(0u, blocked_calls.GetBlockingCallCount());
EXPECT_EQ(1u, blocked_calls.GetCouldBeBlockingCallCount());
EXPECT_EQ(1u, blocked_calls.GetTotalBlockedCallCount());
@@ -286,7 +215,7 @@
// Create a new thread to invoke on.
auto thread = Thread::CreateWithSocketServer();
thread->Start();
- EXPECT_EQ(42, thread->Invoke<int>(RTC_FROM_HERE, []() { return 42; }));
+ EXPECT_EQ(42, thread->BlockingCall([]() { return 42; }));
EXPECT_EQ(1u, blocked_calls.GetBlockingCallCount());
EXPECT_EQ(1u, blocked_calls.GetCouldBeBlockingCallCount());
EXPECT_EQ(2u, blocked_calls.GetTotalBlockedCallCount());
@@ -307,7 +236,7 @@
[&](uint32_t actual_block, uint32_t could_block) {
was_called_back = true;
});
- current.Invoke<void>(RTC_FROM_HERE, []() {});
+ current.BlockingCall([]() {});
}
EXPECT_TRUE(was_called_back);
}
@@ -323,7 +252,7 @@
// Changed `blocked_calls` to not issue the callback if there are 1 or
// fewer blocking calls (i.e. we set the minimum required number to 2).
blocked_calls.set_minimum_call_count_for_callback(2);
- current.Invoke<void>(RTC_FROM_HERE, []() {});
+ current.BlockingCall([]() {});
}
// We should not have gotten a call back.
EXPECT_FALSE(was_called_back);
@@ -421,23 +350,23 @@
main_thread.ProcessMessages(100);
}
-TEST(ThreadTest, Invoke) {
+TEST(ThreadTest, BlockingCall) {
// Create and start the thread.
auto thread = Thread::CreateWithSocketServer();
thread->Start();
// Try calling functors.
- EXPECT_EQ(42, thread->Invoke<int>(RTC_FROM_HERE, FunctorA()));
- AtomicBool called;
- FunctorB f2(&called);
- thread->Invoke<void>(RTC_FROM_HERE, f2);
- EXPECT_TRUE(called.get());
+ EXPECT_EQ(42, thread->BlockingCall([] { return 42; }));
+ bool called = false;
+ thread->BlockingCall([&] { called = true; });
+ EXPECT_TRUE(called);
+
// Try calling bare functions.
struct LocalFuncs {
static int Func1() { return 999; }
static void Func2() {}
};
- EXPECT_EQ(999, thread->Invoke<int>(RTC_FROM_HERE, &LocalFuncs::Func1));
- thread->Invoke<void>(RTC_FROM_HERE, &LocalFuncs::Func2);
+ EXPECT_EQ(999, thread->BlockingCall(&LocalFuncs::Func1));
+ thread->BlockingCall(&LocalFuncs::Func2);
}
// Verifies that two threads calling Invoke on each other at the same time does
@@ -449,8 +378,8 @@
Thread* main_thread = Thread::Current();
auto other_thread = Thread::CreateWithSocketServer();
other_thread->Start();
- other_thread->Invoke<void>(RTC_FROM_HERE, [main_thread] {
- RTC_EXPECT_DEATH(main_thread->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
+ other_thread->BlockingCall([main_thread] {
+ RTC_EXPECT_DEATH(main_thread->BlockingCall([] {}), "loop");
});
}
@@ -464,10 +393,9 @@
auto third = Thread::Create();
third->Start();
- second->Invoke<void>(RTC_FROM_HERE, [&] {
- third->Invoke<void>(RTC_FROM_HERE, [&] {
- RTC_EXPECT_DEATH(first->Invoke<void>(RTC_FROM_HERE, [] {}), "loop");
- });
+ second->BlockingCall([&] {
+ third->BlockingCall(
+ [&] { RTC_EXPECT_DEATH(first->BlockingCall([] {}), "loop"); });
});
}
@@ -476,7 +404,7 @@
// Verifies that if thread A invokes a call on thread B and thread C is trying
// to invoke A at the same time, thread A does not handle C's invoke while
// invoking B.
-TEST(ThreadTest, ThreeThreadsInvoke) {
+TEST(ThreadTest, ThreeThreadsBlockingCall) {
AutoThread thread;
Thread* thread_a = Thread::Current();
auto thread_b = Thread::CreateWithSocketServer();
@@ -506,7 +434,7 @@
struct LocalFuncs {
static void Set(LockedBool* out) { out->Set(true); }
static void InvokeSet(Thread* thread, LockedBool* out) {
- thread->Invoke<void>(RTC_FROM_HERE, [out] { Set(out); });
+ thread->BlockingCall([out] { Set(out); });
}
// Set `out` true and call InvokeSet on `thread`.
@@ -538,8 +466,7 @@
// Thread B returns when C receives the call and C should be blocked until A
// starts to process messages.
Thread* thread_c_ptr = thread_c.get();
- thread_b->Invoke<void>(RTC_FROM_HERE, [thread_c_ptr, thread_a,
- &thread_a_called] {
+ thread_b->BlockingCall([thread_c_ptr, thread_a, &thread_a_called] {
LocalFuncs::AsyncInvokeSetAndWait(thread_c_ptr, thread_a, &thread_a_called);
});
EXPECT_FALSE(thread_a_called.Get());
diff --git a/test/time_controller/simulated_thread.cc b/test/time_controller/simulated_thread.cc
index dc09783..266b76f 100644
--- a/test/time_controller/simulated_thread.cc
+++ b/test/time_controller/simulated_thread.cc
@@ -61,25 +61,18 @@
}
}
-void SimulatedThread::Send(const rtc::Location& posted_from,
- rtc::MessageHandler* phandler,
- uint32_t id,
- rtc::MessageData* pdata) {
+void SimulatedThread::BlockingCall(rtc::FunctionView<void()> functor) {
if (IsQuitting())
return;
- rtc::Message msg;
- msg.posted_from = posted_from;
- msg.phandler = phandler;
- msg.message_id = id;
- msg.pdata = pdata;
+
if (IsCurrent()) {
- msg.phandler->OnMessage(&msg);
+ functor();
} else {
TaskQueueBase* yielding_from = TaskQueueBase::Current();
handler_->StartYield(yielding_from);
RunReady(Timestamp::MinusInfinity());
CurrentThreadSetter set_current(this);
- msg.phandler->OnMessage(&msg);
+ functor();
handler_->StopYield(yielding_from);
}
}
diff --git a/test/time_controller/simulated_thread.h b/test/time_controller/simulated_thread.h
index b6c1e6e..9272e28 100644
--- a/test/time_controller/simulated_thread.h
+++ b/test/time_controller/simulated_thread.h
@@ -36,10 +36,7 @@
TaskQueueBase* GetAsTaskQueue() override { return this; }
// Thread interface
- void Send(const rtc::Location& posted_from,
- rtc::MessageHandler* phandler,
- uint32_t id,
- rtc::MessageData* pdata) override;
+ void BlockingCall(rtc::FunctionView<void()> functor) override;
void Post(const rtc::Location& posted_from,
rtc::MessageHandler* phandler,
uint32_t id,