In rtc::Thread implement posting AnyInvocable
Lots of code call rtc::Thread directly instead of through TaskQueueBase
interface, thus to continue migration step by step rtc::Thread needs
to implement both old and new TaskQueueBase interfaces.
Bug: webrtc:14245
Change-Id: Ie7cac897a4c8a6227b8d467a39adb30aec6f1318
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/267984
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37474}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 7344695..e259fc8 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -914,6 +914,8 @@
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/base:core_headers",
+ "//third_party/abseil-cpp/absl/cleanup",
+ "//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/strings",
]
deps = [
@@ -943,6 +945,7 @@
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task",
+ "../api/units:time_delta",
"synchronization:mutex",
"system:no_unique_address",
"system:rtc_export",
@@ -1730,6 +1733,7 @@
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:task_queue_test",
"../api/task_queue:to_queued_task",
+ "../api/units:time_delta",
"../test:field_trial",
"../test:fileutils",
"../test:rtc_expect_death",
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index e1db2bc..ef165ff 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -31,8 +31,8 @@
#include <utility>
#include "absl/algorithm/container.h"
+#include "absl/cleanup/cleanup.h"
#include "api/sequence_checker.h"
-#include "api/task_queue/to_queued_task.h"
#include "rtc_base/checks.h"
#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/event.h"
@@ -72,22 +72,25 @@
namespace rtc {
namespace {
-class MessageHandlerWithTask final : public MessageHandler {
+struct AnyInvocableMessage final : public MessageData {
+ explicit AnyInvocableMessage(absl::AnyInvocable<void() &&> task)
+ : task(std::move(task)) {}
+ absl::AnyInvocable<void() &&> task;
+};
+
+class AnyInvocableMessageHandler final : public MessageHandler {
public:
- MessageHandlerWithTask() {}
-
- MessageHandlerWithTask(const MessageHandlerWithTask&) = delete;
- MessageHandlerWithTask& operator=(const MessageHandlerWithTask&) = delete;
-
void OnMessage(Message* msg) override {
- static_cast<rtc_thread_internal::MessageLikeTask*>(msg->pdata)->Run();
+ std::move(static_cast<AnyInvocableMessage*>(msg->pdata)->task)();
delete msg->pdata;
}
-
- private:
- ~MessageHandlerWithTask() override {}
};
+MessageHandler* GetAnyInvocableMessageHandler() {
+ static MessageHandler* const handler = new AnyInvocableMessageHandler;
+ return handler;
+}
+
class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
public:
MarkProcessingCritScope(const RecursiveCriticalSection* cs,
@@ -761,8 +764,7 @@
void Thread::SetDispatchWarningMs(int deadline) {
if (!IsCurrent()) {
- PostTask(webrtc::ToQueuedTask(
- [this, deadline]() { SetDispatchWarningMs(deadline); }));
+ PostTask([this, deadline]() { SetDispatchWarningMs(deadline); });
return;
}
RTC_DCHECK_RUN_ON(this);
@@ -948,18 +950,19 @@
done_event.reset(new rtc::Event());
bool ready = false;
- PostTask(webrtc::ToQueuedTask(
- [&msg]() mutable { msg.phandler->OnMessage(&msg); },
- [this, &ready, current_thread, done = done_event.get()] {
- if (current_thread) {
- CritScope cs(&crit_);
- ready = true;
- current_thread->socketserver()->WakeUp();
- } else {
- done->Set();
- }
- }));
-
+ absl::Cleanup cleanup = [this, &ready, current_thread,
+ done = done_event.get()] {
+ if (current_thread) {
+ CritScope cs(&crit_);
+ ready = true;
+ current_thread->socketserver()->WakeUp();
+ } else {
+ done->Set();
+ }
+ };
+ PostTask([&msg, cleanup = std::move(cleanup)]() mutable {
+ msg.phandler->OnMessage(&msg);
+ });
if (current_thread) {
bool waited = false;
crit_.Enter();
@@ -1115,6 +1118,28 @@
delete this;
}
+void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
+ // Though Post takes MessageData by raw pointer (last parameter), it still
+ // takes it with ownership.
+ Post(RTC_FROM_HERE, GetAnyInvocableMessageHandler(),
+ /*id=*/0, new AnyInvocableMessage(std::move(task)));
+}
+
+void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ webrtc::TimeDelta delay) {
+ // This implementation does not support low precision yet.
+ PostDelayedHighPrecisionTask(std::move(task), delay);
+}
+
+void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+ webrtc::TimeDelta delay) {
+ int delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
+ // Though PostDelayed takes MessageData by raw pointer (last parameter),
+ // it still takes it with ownership.
+ PostDelayed(RTC_FROM_HERE, delay_ms, GetAnyInvocableMessageHandler(),
+ /*id=*/0, new AnyInvocableMessage(std::move(task)));
+}
+
bool Thread::IsProcessingMessagesForTesting() {
return (owned_ || IsCurrent()) && !IsQuitting();
}
@@ -1183,13 +1208,6 @@
#endif
}
-// static
-MessageHandler* Thread::GetPostTaskMessageHandler() {
- // Allocate at first call, never deallocate.
- static MessageHandler* handler = new MessageHandlerWithTask;
- return handler;
-}
-
AutoThread::AutoThread()
: Thread(CreateDefaultSocketServer(), /*do_init=*/false) {
if (!ThreadManager::Instance()->CurrentThread()) {
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index 3364684..e87248c 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -28,10 +28,12 @@
#include <pthread.h>
#endif
#include "absl/base/attributes.h"
+#include "absl/functional/any_invocable.h"
#include "api/function_view.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h"
+#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/location.h"
@@ -79,32 +81,6 @@
class Thread;
-namespace rtc_thread_internal {
-
-class MessageLikeTask : public MessageData {
- public:
- virtual void Run() = 0;
-};
-
-template <class FunctorT>
-class MessageWithFunctor final : public MessageLikeTask {
- public:
- explicit MessageWithFunctor(FunctorT&& functor)
- : functor_(std::forward<FunctorT>(functor)) {}
-
- MessageWithFunctor(const MessageWithFunctor&) = delete;
- MessageWithFunctor& operator=(const MessageWithFunctor&) = delete;
-
- void Run() override { functor_(); }
-
- private:
- ~MessageWithFunctor() override {}
-
- typename std::remove_reference<FunctorT>::type functor_;
-};
-
-} // namespace rtc_thread_internal
-
class RTC_EXPORT ThreadManager {
public:
static const int kForever = -1;
@@ -418,36 +394,29 @@
bool IsInvokeToThreadAllowed(rtc::Thread* target);
// From TaskQueueBase
+ void Delete() override;
+ void PostTask(absl::AnyInvocable<void() &&> task) override;
+ void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ webrtc::TimeDelta delay) override;
+ void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+ webrtc::TimeDelta delay) override;
+
+ // Legacy TaskQueueBase methods, do not use in new code.
+ // TODO(bugs.webrtc.org/14245): Delete when all code that use rtc::Thread
+ // directly is updated to use PostTask methods above.
void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override;
void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
uint32_t milliseconds) override;
void PostDelayedHighPrecisionTask(std::unique_ptr<webrtc::QueuedTask> task,
uint32_t milliseconds) override;
- void Delete() override;
- // Helper methods to avoid having to do ToQueuedTask() at the calling places.
- template <class Closure,
- typename std::enable_if<!std::is_convertible<
- Closure,
- std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
- void PostTask(Closure&& closure) {
- PostTask(webrtc::ToQueuedTask(std::forward<Closure>(closure)));
- }
- template <class Closure,
- typename std::enable_if<!std::is_convertible<
- Closure,
- std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
- void PostDelayedTask(Closure&& closure, uint32_t milliseconds) {
- PostDelayedTask(webrtc::ToQueuedTask(std::forward<Closure>(closure)),
- milliseconds);
- }
- template <class Closure,
- typename std::enable_if<!std::is_convertible<
- Closure,
- std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
- void PostDelayedHighPrecisionTask(Closure&& closure, uint32_t milliseconds) {
- PostDelayedHighPrecisionTask(
- webrtc::ToQueuedTask(std::forward<Closure>(closure)), milliseconds);
+ // Legacy helper method, do not use in new code.
+ // TODO(bugs.webrtc.org/14245): Delete when all code that use rtc::Thread
+ // directly is updated to use PostTask methods above.
+ ABSL_DEPRECATED("Pass delay as webrtc::TimeDelta type")
+ void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ uint32_t milliseconds) {
+ PostDelayedTask(std::move(task), webrtc::TimeDelta::Millis(milliseconds));
}
// ProcessMessages will process I/O and dispatch messages until:
@@ -609,10 +578,6 @@
// Called by the ThreadManager when being unset as the current thread.
void ClearCurrentTaskQueue();
- // Returns a static-lifetime MessageHandler which runs message with
- // MessageLikeTask payload data.
- static MessageHandler* GetPostTaskMessageHandler();
-
bool fPeekKeep_;
Message msgPeek_;
MessageList messages_ RTC_GUARDED_BY(crit_);
diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc
index 321cbc3..7fcf7ca 100644
--- a/rtc_base/thread_unittest.cc
+++ b/rtc_base/thread_unittest.cc
@@ -14,7 +14,7 @@
#include "api/task_queue/task_queue_factory.h"
#include "api/task_queue/task_queue_test.h"
-#include "api/task_queue/to_queued_task.h"
+#include "api/units/time_delta.h"
#include "rtc_base/async_invoker.h"
#include "rtc_base/async_udp_socket.h"
#include "rtc_base/checks.h"
@@ -36,7 +36,7 @@
namespace rtc {
namespace {
-using ::webrtc::ToQueuedTask;
+using ::webrtc::TimeDelta;
// Generates a sequence of numbers (collaboratively).
class TestGenerator {
@@ -373,8 +373,8 @@
auto thread1 = Thread::CreateWithSocketServer();
auto thread2 = Thread::CreateWithSocketServer();
- thread1->PostTask(ToQueuedTask(
- [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
+ thread1->PostTask(
+ [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); });
main_thread.ProcessMessages(100);
}
@@ -389,11 +389,11 @@
thread1->AllowInvokesToThread(thread2.get());
thread1->AllowInvokesToThread(thread3.get());
- thread1->PostTask(ToQueuedTask([&]() {
+ thread1->PostTask([&]() {
EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get()));
EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get()));
EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get()));
- }));
+ });
main_thread.ProcessMessages(100);
}
@@ -405,9 +405,8 @@
thread1->DisallowAllInvokes();
- thread1->PostTask(ToQueuedTask([&]() {
- EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get()));
- }));
+ thread1->PostTask(
+ [&]() { EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get())); });
main_thread.ProcessMessages(100);
}
#endif // (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
@@ -418,8 +417,8 @@
auto thread1 = Thread::CreateWithSocketServer();
auto thread2 = Thread::CreateWithSocketServer();
- thread1->PostTask(ToQueuedTask(
- [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }));
+ thread1->PostTask(
+ [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); });
main_thread.ProcessMessages(100);
}
@@ -672,11 +671,11 @@
};
// Post messages (both delayed and non delayed) to both threads.
- a->PostTask(ToQueuedTask(incrementer));
- b->PostTask(ToQueuedTask(incrementer));
- a->PostDelayedTask(ToQueuedTask(incrementer), 0);
- b->PostDelayedTask(ToQueuedTask(incrementer), 0);
- main_thread.PostTask(ToQueuedTask(event_signaler));
+ a->PostTask(incrementer);
+ b->PostTask(incrementer);
+ a->PostDelayedTask(incrementer, TimeDelta::Zero());
+ b->PostDelayedTask(incrementer, TimeDelta::Zero());
+ main_thread.PostTask(event_signaler);
ThreadManager::ProcessAllMessageQueuesForTesting();
EXPECT_EQ(4, messages_processed.load(std::memory_order_acquire));
@@ -1083,7 +1082,7 @@
WaitAndSetEvent(&event_set_by_test_thread,
&event_set_by_background_thread);
},
- /*milliseconds=*/10);
+ TimeDelta::Millis(10));
event_set_by_test_thread.Set();
event_set_by_background_thread.Wait(Event::kForever);
}
@@ -1100,18 +1099,18 @@
background_thread->PostDelayedTask(
[&third, &fourth] { WaitAndSetEvent(&third, &fourth); },
- /*milliseconds=*/11);
+ TimeDelta::Millis(11));
background_thread->PostDelayedTask(
[&first, &second] { WaitAndSetEvent(&first, &second); },
- /*milliseconds=*/9);
+ TimeDelta::Millis(9));
background_thread->PostDelayedTask(
[&second, &third] { WaitAndSetEvent(&second, &third); },
- /*milliseconds=*/10);
+ TimeDelta::Millis(10));
// All tasks have been posted before the first one is unblocked.
first.Set();
// Only if the chain is invoked in delay order will the last event be set.
- clock.AdvanceTime(webrtc::TimeDelta::Millis(11));
+ clock.AdvanceTime(TimeDelta::Millis(11));
EXPECT_TRUE(fourth.Wait(0));
}