Migrate rtc_base and rtc_tools to absl::AnyInvocable based TaskQueueBase interface
Bug: webrtc:14245
Change-Id: I71abe3db7a23ad33bd175297e23fa8e927fa9628
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268768
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37553}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 3ee88f4..322fd7a 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -948,7 +948,6 @@
"../api:sequence_checker",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
- "../api/task_queue:to_queued_task",
"../api/units:time_delta",
"synchronization:mutex",
"system:no_unique_address",
@@ -1092,8 +1091,8 @@
"../api/numerics",
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
- "../api/task_queue:to_queued_task",
"../api/transport:field_trial_based_config",
+ "../api/units:time_delta",
"../system_wrappers:field_trial",
"memory:always_valid_pointer",
"network:sent_packet",
@@ -1378,7 +1377,6 @@
":stringutils",
":threading",
":timeutils",
- "../api/task_queue:to_queued_task",
"../api/units:time_delta",
"../api/units:timestamp",
"../test:scoped_key_value_config",
@@ -1407,11 +1405,14 @@
":macromagic",
":rtc_event",
":rtc_task_queue",
+ "../api:function_view",
"../api/task_queue",
"../api/task_queue:default_task_queue_factory",
- "../api/task_queue:to_queued_task",
]
- absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
+ absl_deps = [
+ "//third_party/abseil-cpp/absl/cleanup",
+ "//third_party/abseil-cpp/absl/strings",
+ ]
}
if (rtc_include_tests) {
@@ -1594,7 +1595,6 @@
"../api:make_ref_counted",
"../api:scoped_refptr",
"../api/numerics",
- "../api/task_queue:to_queued_task",
"../api/units:time_delta",
"../system_wrappers",
"../test:fileutils",
@@ -1631,6 +1631,7 @@
":rtc_task_queue",
":task_queue_for_test",
":timeutils",
+ "../api/units:time_delta",
"../test:test_main",
"../test:test_support",
]
@@ -1736,7 +1737,6 @@
"../api/task_queue",
"../api/task_queue:pending_task_safety_flag",
"../api/task_queue:task_queue_test",
- "../api/task_queue:to_queued_task",
"../api/units:time_delta",
"../test:field_trial",
"../test:fileutils",
@@ -1770,6 +1770,7 @@
}
absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container",
+ "//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings",
"//third_party/abseil-cpp/absl/types:optional",
diff --git a/rtc_base/async_resolver.cc b/rtc_base/async_resolver.cc
index 2975d9f..198013c 100644
--- a/rtc_base/async_resolver.cc
+++ b/rtc_base/async_resolver.cc
@@ -34,7 +34,6 @@
#endif // defined(WEBRTC_POSIX) && !defined(__native_client__)
#include "api/task_queue/task_queue_base.h"
-#include "api/task_queue/to_queued_task.h"
#include "rtc_base/ip_address.h"
#include "rtc_base/logging.h"
#include "rtc_base/platform_thread.h"
@@ -51,17 +50,17 @@
namespace {
void GlobalGcdRunTask(void* context) {
- std::unique_ptr<webrtc::QueuedTask> task(
- static_cast<webrtc::QueuedTask*>(context));
- task->Run();
+ std::unique_ptr<absl::AnyInvocable<void() &&>> task(
+ static_cast<absl::AnyInvocable<void() &&>*>(context));
+ std::move (*task)();
}
// Post a task into the system-defined global concurrent queue.
-void PostTaskToGlobalQueue(std::unique_ptr<webrtc::QueuedTask> task) {
+void PostTaskToGlobalQueue(
+ std::unique_ptr<absl::AnyInvocable<void() &&>> task) {
dispatch_queue_global_t global_queue =
dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
- webrtc::QueuedTask* context = task.release();
- dispatch_async_f(global_queue, context, &GlobalGcdRunTask);
+ dispatch_async_f(global_queue, task.release(), &GlobalGcdRunTask);
}
} // namespace
@@ -156,7 +155,7 @@
int error = ResolveHostname(addr.hostname(), addr.family(), &addresses);
webrtc::MutexLock lock(&state->mutex);
if (state->status == State::Status::kLive) {
- caller_task_queue->PostTask(webrtc::ToQueuedTask(
+ caller_task_queue->PostTask(
[this, error, addresses = std::move(addresses), state] {
bool live;
{
@@ -169,11 +168,12 @@
RTC_DCHECK_RUN_ON(&sequence_checker_);
ResolveDone(std::move(addresses), error);
}
- }));
+ });
}
};
#if defined(WEBRTC_MAC) || defined(WEBRTC_IOS)
- PostTaskToGlobalQueue(webrtc::ToQueuedTask(std::move(thread_function)));
+ PostTaskToGlobalQueue(
+ std::make_unique<absl::AnyInvocable<void() &&>>(thread_function));
#else
PlatformThread::SpawnDetached(std::move(thread_function), "AsyncResolver");
#endif
diff --git a/rtc_base/fake_mdns_responder.h b/rtc_base/fake_mdns_responder.h
index a7dcb96..8be6f1c 100644
--- a/rtc_base/fake_mdns_responder.h
+++ b/rtc_base/fake_mdns_responder.h
@@ -16,7 +16,6 @@
#include <string>
#include "absl/strings/string_view.h"
-#include "api/task_queue/to_queued_task.h"
#include "rtc_base/ip_address.h"
#include "rtc_base/location.h"
#include "rtc_base/mdns_responder_interface.h"
@@ -41,8 +40,7 @@
name = std::to_string(next_available_id_++) + ".local";
addr_name_map_[addr] = name;
}
- thread_->PostTask(
- ToQueuedTask([callback, addr, name]() { callback(addr, name); }));
+ thread_->PostTask([callback, addr, name]() { callback(addr, name); });
}
void RemoveNameForAddress(const rtc::IPAddress& addr,
NameRemovedCallback callback) override {
@@ -51,7 +49,7 @@
addr_name_map_.erase(it);
}
bool result = it != addr_name_map_.end();
- thread_->PostTask(ToQueuedTask([callback, result]() { callback(result); }));
+ thread_->PostTask([callback, result]() { callback(result); });
}
rtc::IPAddress GetMappedAddressForName(absl::string_view name) const {
diff --git a/rtc_base/memory/BUILD.gn b/rtc_base/memory/BUILD.gn
index a42c2e1..778a396 100644
--- a/rtc_base/memory/BUILD.gn
+++ b/rtc_base/memory/BUILD.gn
@@ -36,7 +36,6 @@
"..:rtc_base",
"..:threading",
"../../api/task_queue:pending_task_safety_flag",
- "../../api/task_queue:to_queued_task",
"../synchronization:mutex",
]
}
diff --git a/rtc_base/memory/fifo_buffer.h b/rtc_base/memory/fifo_buffer.h
index 2fa8213..aa3164f 100644
--- a/rtc_base/memory/fifo_buffer.h
+++ b/rtc_base/memory/fifo_buffer.h
@@ -14,7 +14,6 @@
#include <memory>
#include "api/task_queue/pending_task_safety_flag.h"
-#include "api/task_queue/to_queued_task.h"
#include "rtc_base/stream.h"
#include "rtc_base/synchronization/mutex.h"
@@ -81,9 +80,9 @@
private:
void PostEvent(int events, int err) {
- owner_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
- SignalEvent(this, events, err);
- }));
+ owner_->PostTask(webrtc::SafeTask(
+ task_safety_.flag(),
+ [this, events, err]() { SignalEvent(this, events, err); }));
}
// Helper method that implements Read. Caller must acquire a lock
diff --git a/rtc_base/network.cc b/rtc_base/network.cc
index 364a52d..9b53eb3 100644
--- a/rtc_base/network.cc
+++ b/rtc_base/network.cc
@@ -30,8 +30,9 @@
#include "absl/memory/memory.h"
#include "absl/strings/match.h"
#include "absl/strings/string_view.h"
-#include "api/task_queue/to_queued_task.h"
+#include "api/task_queue/pending_task_safety_flag.h"
#include "api/transport/field_trial_based_config.h"
+#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/memory/always_valid_pointer.h"
@@ -44,6 +45,8 @@
namespace rtc {
namespace {
+using ::webrtc::SafeTask;
+using ::webrtc::TimeDelta;
// List of MAC addresses of known VPN (for windows).
constexpr uint8_t kVpns[2][6] = {
@@ -906,14 +909,14 @@
// we should trigger network signal immediately for the new clients
// to start allocating ports.
if (sent_first_update_)
- thread_->PostTask(ToQueuedTask(task_safety_flag_, [this] {
+ thread_->PostTask(SafeTask(task_safety_flag_, [this] {
RTC_DCHECK_RUN_ON(thread_);
SignalNetworksChanged();
}));
} else {
RTC_DCHECK(task_safety_flag_ == nullptr);
task_safety_flag_ = webrtc::PendingTaskSafetyFlag::Create();
- thread_->PostTask(ToQueuedTask(task_safety_flag_, [this] {
+ thread_->PostTask(SafeTask(task_safety_flag_, [this] {
RTC_DCHECK_RUN_ON(thread_);
UpdateNetworksContinually();
}));
@@ -1029,12 +1032,12 @@
void BasicNetworkManager::UpdateNetworksContinually() {
UpdateNetworksOnce();
- thread_->PostDelayedTask(ToQueuedTask(task_safety_flag_,
- [this] {
- RTC_DCHECK_RUN_ON(thread_);
- UpdateNetworksContinually();
- }),
- kNetworksUpdateIntervalMs);
+ thread_->PostDelayedTask(SafeTask(task_safety_flag_,
+ [this] {
+ RTC_DCHECK_RUN_ON(thread_);
+ UpdateNetworksContinually();
+ }),
+ TimeDelta::Millis(kNetworksUpdateIntervalMs));
}
void BasicNetworkManager::DumpNetworks() {
diff --git a/rtc_base/openssl_stream_adapter.cc b/rtc_base/openssl_stream_adapter.cc
index bc1c5be..da484ad 100644
--- a/rtc_base/openssl_stream_adapter.cc
+++ b/rtc_base/openssl_stream_adapter.cc
@@ -39,7 +39,6 @@
#else
#include "rtc_base/openssl_identity.h"
#endif
-#include "api/task_queue/to_queued_task.h"
#include "rtc_base/openssl_utility.h"
#include "rtc_base/ssl_certificate.h"
#include "rtc_base/stream.h"
@@ -60,6 +59,7 @@
namespace rtc {
namespace {
+using ::webrtc::SafeTask;
// SRTP cipher suite table. `internal_name` is used to construct a
// colon-separated profile strings which is needed by
// SSL_CTX_set_tlsext_use_srtp().
@@ -821,8 +821,9 @@
}
void OpenSSLStreamAdapter::PostEvent(int events, int err) {
- owner_->PostTask(webrtc::ToQueuedTask(
- task_safety_, [this, events, err]() { SignalEvent(this, events, err); }));
+ owner_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
+ SignalEvent(this, events, err);
+ }));
}
void OpenSSLStreamAdapter::SetTimeout(int delay_ms) {
diff --git a/rtc_base/ssl_stream_adapter_unittest.cc b/rtc_base/ssl_stream_adapter_unittest.cc
index 8b794a0..49cbbe0 100644
--- a/rtc_base/ssl_stream_adapter_unittest.cc
+++ b/rtc_base/ssl_stream_adapter_unittest.cc
@@ -18,7 +18,6 @@
#include "absl/memory/memory.h"
#include "absl/strings/string_view.h"
#include "api/task_queue/pending_task_safety_flag.h"
-#include "api/task_queue/to_queued_task.h"
#include "rtc_base/buffer_queue.h"
#include "rtc_base/checks.h"
#include "rtc_base/gunit.h"
@@ -36,6 +35,7 @@
using ::testing::tuple;
using ::testing::Values;
using ::testing::WithParamInterface;
+using ::webrtc::SafeTask;
static const int kBlockSize = 4096;
static const char kExporterLabel[] = "label";
@@ -220,7 +220,7 @@
private:
void PostEvent(int events, int err) {
- thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
+ thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
SignalEvent(this, events, err);
}));
}
@@ -292,7 +292,7 @@
private:
void PostEvent(int events, int err) {
- thread_->PostTask(webrtc::ToQueuedTask(task_safety_, [this, events, err]() {
+ thread_->PostTask(SafeTask(task_safety_.flag(), [this, events, err]() {
SignalEvent(this, events, err);
}));
}
diff --git a/rtc_base/task_queue_for_test.h b/rtc_base/task_queue_for_test.h
index c1de874..450050a 100644
--- a/rtc_base/task_queue_for_test.h
+++ b/rtc_base/task_queue_for_test.h
@@ -13,9 +13,10 @@
#include <utility>
+#include "absl/cleanup/cleanup.h"
#include "absl/strings/string_view.h"
+#include "api/function_view.h"
#include "api/task_queue/task_queue_base.h"
-#include "api/task_queue/to_queued_task.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/location.h"
@@ -24,13 +25,14 @@
namespace webrtc {
-template <typename Closure>
-void SendTask(rtc::Location loc, TaskQueueBase* task_queue, Closure&& task) {
+inline void SendTask(rtc::Location loc,
+ TaskQueueBase* task_queue,
+ rtc::FunctionView<void()> task) {
RTC_CHECK(!task_queue->IsCurrent())
<< "Called SendTask to a queue from the same queue at " << loc.ToString();
rtc::Event event;
- task_queue->PostTask(
- ToQueuedTask(std::forward<Closure>(task), [&event] { event.Set(); }));
+ absl::Cleanup cleanup = [&event] { event.Set(); };
+ task_queue->PostTask([task, cleanup = std::move(cleanup)] { task(); });
RTC_CHECK(event.Wait(/*give_up_after_ms=*/rtc::Event::kForever,
/*warn_after_ms=*/10'000))
<< "Waited too long at " << loc.ToString();
@@ -47,24 +49,8 @@
// A convenience, test-only method that blocks the current thread while
// a task executes on the task queue.
- // This variant is specifically for posting custom QueuedTask derived
- // implementations that tests do not want to pass ownership of over to the
- // task queue (i.e. the Run() method always returns `false`.).
- template <class Closure>
- void SendTask(Closure* task) {
- RTC_CHECK(!IsCurrent());
- rtc::Event event;
- PostTask(ToQueuedTask(
- [&task] { RTC_CHECK_EQ(false, static_cast<QueuedTask*>(task)->Run()); },
- [&event] { event.Set(); }));
- event.Wait(rtc::Event::kForever);
- }
-
- // A convenience, test-only method that blocks the current thread while
- // a task executes on the task queue.
- template <class Closure>
- void SendTask(Closure&& task, rtc::Location loc) {
- ::webrtc::SendTask(loc, Get(), std::forward<Closure>(task));
+ void SendTask(rtc::FunctionView<void()> task, rtc::Location loc) {
+ ::webrtc::SendTask(loc, Get(), task);
}
// Wait for the completion of all tasks posted prior to the
diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn
index 732f6d7..fef30eb 100644
--- a/rtc_base/task_utils/BUILD.gn
+++ b/rtc_base/task_utils/BUILD.gn
@@ -46,7 +46,6 @@
"..:rtc_task_queue",
"..:task_queue_for_test",
"../../api/task_queue",
- "../../api/task_queue:to_queued_task",
"../../api/units:time_delta",
"../../api/units:timestamp",
"../../system_wrappers:system_wrappers",
diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc
index ef165ff..a4eaf2c 100644
--- a/rtc_base/thread.cc
+++ b/rtc_base/thread.cc
@@ -1039,8 +1039,7 @@
void Thread::AllowInvokesToThread(Thread* thread) {
#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
if (!IsCurrent()) {
- PostTask(webrtc::ToQueuedTask(
- [thread, this]() { AllowInvokesToThread(thread); }));
+ PostTask([thread, this]() { AllowInvokesToThread(thread); });
return;
}
RTC_DCHECK_RUN_ON(this);
@@ -1052,7 +1051,7 @@
void Thread::DisallowAllInvokes() {
#if (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
if (!IsCurrent()) {
- PostTask(webrtc::ToQueuedTask([this]() { DisallowAllInvokes(); }));
+ PostTask([this]() { DisallowAllInvokes(); });
return;
}
RTC_DCHECK_RUN_ON(this);
diff --git a/rtc_base/thread.h b/rtc_base/thread.h
index e87248c..7606b37 100644
--- a/rtc_base/thread.h
+++ b/rtc_base/thread.h
@@ -32,7 +32,6 @@
#include "api/function_view.h"
#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
-#include "api/task_queue/to_queued_task.h"
#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/deprecated/recursive_critical_section.h"
diff --git a/rtc_base/time_utils_unittest.cc b/rtc_base/time_utils_unittest.cc
index ced6e35..33b84d5 100644
--- a/rtc_base/time_utils_unittest.cc
+++ b/rtc_base/time_utils_unittest.cc
@@ -12,7 +12,6 @@
#include <memory>
-#include "api/task_queue/to_queued_task.h"
#include "api/units/time_delta.h"
#include "rtc_base/event.h"
#include "rtc_base/fake_clock.h"
@@ -23,6 +22,7 @@
#include "test/gtest.h"
namespace rtc {
+using ::webrtc::TimeDelta;
TEST(TimeTest, TimeInMs) {
int64_t ts_earlier = TimeMillis();
@@ -270,10 +270,9 @@
// Post an event that won't be executed for 10 seconds.
Event message_handler_dispatched;
- worker->PostDelayedTask(webrtc::ToQueuedTask([&message_handler_dispatched] {
- message_handler_dispatched.Set();
- }),
- /*milliseconds=*/60000);
+ worker->PostDelayedTask(
+ [&message_handler_dispatched] { message_handler_dispatched.Set(); },
+ TimeDelta::Seconds(60));
// Wait for a bit for the worker thread to be started and enter its socket
// select(). Otherwise this test would be trivial since the worker thread
diff --git a/rtc_base/unique_id_generator_unittest.cc b/rtc_base/unique_id_generator_unittest.cc
index dc5e9c2..c6eb511 100644
--- a/rtc_base/unique_id_generator_unittest.cc
+++ b/rtc_base/unique_id_generator_unittest.cc
@@ -14,8 +14,10 @@
#include <vector>
#include "absl/algorithm/container.h"
+#include "absl/functional/any_invocable.h"
#include "api/array_view.h"
#include "api/task_queue/task_queue_base.h"
+#include "api/units/time_delta.h"
#include "rtc_base/gunit.h"
#include "rtc_base/helpers.h"
#include "test/gmock.h"
@@ -31,9 +33,11 @@
FakeTaskQueue() : task_queue_setter_(this) {}
void Delete() override {}
- void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override {}
- void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
- uint32_t milliseconds) override {}
+ void PostTask(absl::AnyInvocable<void() &&> task) override {}
+ void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ webrtc::TimeDelta delay) override {}
+ void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+ webrtc::TimeDelta delay) override {}
private:
CurrentTaskQueueSetter task_queue_setter_;