Update TaskQueueStdlib implementation to absl::AnyInvocable
Bug: webrtc:14245
Change-Id: Ic0c55cbb4dbdd31359bbe15f1acd7a2b7e9e61f7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268901
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37568}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index c183f5f..528f124 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -712,6 +712,7 @@
]
deps = [
":checks",
+ ":divide_round",
":logging",
":macromagic",
":platform_thread",
@@ -719,9 +720,13 @@
":safe_conversions",
":timeutils",
"../api/task_queue",
+ "../api/units:time_delta",
"synchronization:mutex",
]
- absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
+ absl_deps = [
+ "//third_party/abseil-cpp/absl/functional:any_invocable",
+ "//third_party/abseil-cpp/absl/strings",
+ ]
}
rtc_library("weak_ptr") {
diff --git a/rtc_base/task_queue_stdlib.cc b/rtc_base/task_queue_stdlib.cc
index 946261e..19e7dea 100644
--- a/rtc_base/task_queue_stdlib.cc
+++ b/rtc_base/task_queue_stdlib.cc
@@ -18,12 +18,14 @@
#include <queue>
#include <utility>
+#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
-#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
+#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
+#include "rtc_base/numerics/divide_round.h"
#include "rtc_base/platform_thread.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h"
@@ -50,27 +52,29 @@
~TaskQueueStdlib() override = default;
void Delete() override;
- void PostTask(std::unique_ptr<QueuedTask> task) override;
- void PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) override;
+ void PostTask(absl::AnyInvocable<void() &&> task) override;
+ void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) override;
+ void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) override;
private:
using OrderId = uint64_t;
struct DelayedEntryTimeout {
- int64_t next_fire_at_ms_{};
- OrderId order_{};
+ int64_t next_fire_at_us{};
+ OrderId order{};
bool operator<(const DelayedEntryTimeout& o) const {
- return std::tie(next_fire_at_ms_, order_) <
- std::tie(o.next_fire_at_ms_, o.order_);
+ return std::tie(next_fire_at_us, order) <
+ std::tie(o.next_fire_at_us, o.order);
}
};
struct NextTask {
- bool final_task_ = false;
- std::unique_ptr<QueuedTask> run_task_;
- int64_t sleep_time_ms_ = 0;
+ bool final_task = false;
+ absl::AnyInvocable<void() &&> run_task;
+ int64_t sleep_time_ms = rtc::Event::kForever;
};
static rtc::PlatformThread InitializeThread(TaskQueueStdlib* me,
@@ -97,7 +101,7 @@
// The list of all pending tasks that need to be processed in the
// FIFO queue ordering on the worker thread.
- std::queue<std::pair<OrderId, std::unique_ptr<QueuedTask>>> pending_queue_
+ std::queue<std::pair<OrderId, absl::AnyInvocable<void() &&>>> pending_queue_
RTC_GUARDED_BY(pending_lock_);
// The list of all pending tasks that need to be processed at a future
@@ -105,8 +109,8 @@
// happen at exactly the same time interval as another task then the
// task is processed based on FIFO ordering. std::priority_queue was
// considered but rejected due to its inability to extract the
- // std::unique_ptr out of the queue without the presence of a hack.
- std::map<DelayedEntryTimeout, std::unique_ptr<QueuedTask>> delayed_queue_
+ // move-only value out of the queue without the presence of a hack.
+ std::map<DelayedEntryTimeout, absl::AnyInvocable<void() &&>> delayed_queue_
RTC_GUARDED_BY(pending_lock_);
// Contains the active worker thread assigned to processing
@@ -151,43 +155,45 @@
delete this;
}
-void TaskQueueStdlib::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueueStdlib::PostTask(absl::AnyInvocable<void() &&> task) {
{
MutexLock lock(&pending_lock_);
- OrderId order = thread_posting_order_++;
-
- pending_queue_.push(std::pair<OrderId, std::unique_ptr<QueuedTask>>(
- order, std::move(task)));
+ pending_queue_.push(
+ std::make_pair(++thread_posting_order_, std::move(task)));
}
NotifyWake();
}
-void TaskQueueStdlib::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
- const auto fire_at = rtc::TimeMillis() + milliseconds;
-
- DelayedEntryTimeout delay;
- delay.next_fire_at_ms_ = fire_at;
+void TaskQueueStdlib::PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) {
+ DelayedEntryTimeout delayed_entry;
+ delayed_entry.next_fire_at_us = rtc::TimeMicros() + delay.us();
{
MutexLock lock(&pending_lock_);
- delay.order_ = ++thread_posting_order_;
- delayed_queue_[delay] = std::move(task);
+ delayed_entry.order = ++thread_posting_order_;
+ delayed_queue_[delayed_entry] = std::move(task);
}
NotifyWake();
}
+void TaskQueueStdlib::PostDelayedHighPrecisionTask(
+ absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) {
+ PostDelayedTask(std::move(task), delay);
+}
+
TaskQueueStdlib::NextTask TaskQueueStdlib::GetNextTask() {
NextTask result;
- const auto tick = rtc::TimeMillis();
+ const int64_t tick_us = rtc::TimeMicros();
MutexLock lock(&pending_lock_);
if (thread_should_quit_) {
- result.final_task_ = true;
+ result.final_task = true;
return result;
}
@@ -195,29 +201,30 @@
auto delayed_entry = delayed_queue_.begin();
const auto& delay_info = delayed_entry->first;
auto& delay_run = delayed_entry->second;
- if (tick >= delay_info.next_fire_at_ms_) {
+ if (tick_us >= delay_info.next_fire_at_us) {
if (pending_queue_.size() > 0) {
auto& entry = pending_queue_.front();
auto& entry_order = entry.first;
auto& entry_run = entry.second;
- if (entry_order < delay_info.order_) {
- result.run_task_ = std::move(entry_run);
+ if (entry_order < delay_info.order) {
+ result.run_task = std::move(entry_run);
pending_queue_.pop();
return result;
}
}
- result.run_task_ = std::move(delay_run);
+ result.run_task = std::move(delay_run);
delayed_queue_.erase(delayed_entry);
return result;
}
- result.sleep_time_ms_ = delay_info.next_fire_at_ms_ - tick;
+ result.sleep_time_ms =
+ DivideRoundUp(delay_info.next_fire_at_us - tick_us, 1'000);
}
if (pending_queue_.size() > 0) {
auto& entry = pending_queue_.front();
- result.run_task_ = std::move(entry.second);
+ result.run_task = std::move(entry.second);
pending_queue_.pop();
}
@@ -228,21 +235,18 @@
while (true) {
auto task = GetNextTask();
- if (task.final_task_)
+ if (task.final_task)
break;
- if (task.run_task_) {
+ if (task.run_task) {
// process entry immediately then try again
- QueuedTask* release_ptr = task.run_task_.release();
- if (release_ptr->Run())
- delete release_ptr;
+ std::move(task.run_task)();
// Attempt to run more tasks before going to sleep.
continue;
}
- flag_notify_.Wait(0 == task.sleep_time_ms_ ? rtc::Event::kForever
- : task.sleep_time_ms_);
+ flag_notify_.Wait(task.sleep_time_ms);
}
}