Update TaskQueueLibevent implementation to absl::AnyInvocable
Bug: webrtc:14245, webrtc:12889
Change-Id: I1aa20e3d5645c270abd1bee0c45c6982e799eaa4
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/268767
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37563}
diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn
index 322fd7a..546a6d6 100644
--- a/rtc_base/BUILD.gn
+++ b/rtc_base/BUILD.gn
@@ -645,10 +645,12 @@
":safe_conversions",
":timeutils",
"../api/task_queue",
+ "../api/units:time_delta",
"synchronization:mutex",
]
absl_deps = [
"//third_party/abseil-cpp/absl/container:inlined_vector",
+ "//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/strings",
]
if (rtc_build_libevent) {
diff --git a/rtc_base/task_queue_libevent.cc b/rtc_base/task_queue_libevent.cc
index ba80a64..f50e5a6 100644
--- a/rtc_base/task_queue_libevent.cc
+++ b/rtc_base/task_queue_libevent.cc
@@ -24,9 +24,10 @@
#include <utility>
#include "absl/container/inlined_vector.h"
+#include "absl/functional/any_invocable.h"
#include "absl/strings/string_view.h"
-#include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h"
+#include "api/units/time_delta.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/numerics/safe_conversions.h"
@@ -106,14 +107,18 @@
TaskQueueLibevent(absl::string_view queue_name, rtc::ThreadPriority priority);
void Delete() override;
- void PostTask(std::unique_ptr<QueuedTask> task) override;
- void PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) override;
+ void PostTask(absl::AnyInvocable<void() &&> task) override;
+ void PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) override;
+ void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) override;
private:
- class SetTimerTask;
struct TimerEvent;
+ void PostDelayedTaskOnTaskQueue(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay);
+
~TaskQueueLibevent() override = default;
static void OnWakeup(int socket, short flags, void* context); // NOLINT
@@ -126,43 +131,20 @@
event wakeup_event_;
rtc::PlatformThread thread_;
Mutex pending_lock_;
- absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> pending_
+ absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> pending_
RTC_GUARDED_BY(pending_lock_);
// Holds a list of events pending timers for cleanup when the loop exits.
std::list<TimerEvent*> pending_timers_;
};
struct TaskQueueLibevent::TimerEvent {
- TimerEvent(TaskQueueLibevent* task_queue, std::unique_ptr<QueuedTask> task)
+ TimerEvent(TaskQueueLibevent* task_queue, absl::AnyInvocable<void() &&> task)
: task_queue(task_queue), task(std::move(task)) {}
~TimerEvent() { event_del(&ev); }
event ev;
TaskQueueLibevent* task_queue;
- std::unique_ptr<QueuedTask> task;
-};
-
-class TaskQueueLibevent::SetTimerTask : public QueuedTask {
- public:
- SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
- : task_(std::move(task)),
- milliseconds_(milliseconds),
- posted_(rtc::Time32()) {}
-
- private:
- bool Run() override {
- // Compensate for the time that has passed since construction
- // and until we got here.
- uint32_t post_time = rtc::Time32() - posted_;
- TaskQueueLibevent::Current()->PostDelayedTask(
- std::move(task_),
- post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
- return true;
- }
-
- std::unique_ptr<QueuedTask> task_;
- const uint32_t milliseconds_;
- const uint32_t posted_;
+ absl::AnyInvocable<void() &&> task;
};
TaskQueueLibevent::TaskQueueLibevent(absl::string_view queue_name,
@@ -219,7 +201,7 @@
delete this;
}
-void TaskQueueLibevent::PostTask(std::unique_ptr<QueuedTask> task) {
+void TaskQueueLibevent::PostTask(absl::AnyInvocable<void() &&> task) {
{
MutexLock lock(&pending_lock_);
bool had_pending_tasks = !pending_.empty();
@@ -242,21 +224,43 @@
sizeof(message));
}
-void TaskQueueLibevent::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
+void TaskQueueLibevent::PostDelayedTaskOnTaskQueue(
+ absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) {
+ // libevent api is not thread safe by default, thus event_add need to be
+ // called on the `thread_`.
+ RTC_DCHECK(IsCurrent());
+
+ TimerEvent* timer = new TimerEvent(this, std::move(task));
+ EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
+ timer);
+ pending_timers_.push_back(timer);
+ timeval tv = {.tv_sec = rtc::dchecked_cast<int>(delay.us() / 1'000'000),
+ .tv_usec = rtc::dchecked_cast<int>(delay.us() % 1'000'000)};
+ event_add(&timer->ev, &tv);
+}
+
+void TaskQueueLibevent::PostDelayedTask(absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) {
if (IsCurrent()) {
- TimerEvent* timer = new TimerEvent(this, std::move(task));
- EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueueLibevent::RunTimer,
- timer);
- pending_timers_.push_back(timer);
- timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
- rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
- event_add(&timer->ev, &tv);
+ PostDelayedTaskOnTaskQueue(std::move(task), delay);
} else {
- PostTask(std::make_unique<SetTimerTask>(std::move(task), milliseconds));
+ int64_t posted_us = rtc::TimeMicros();
+ PostTask([posted_us, delay, task = std::move(task), this]() mutable {
+ // Compensate for the time that has passed since the posting.
+ TimeDelta post_time = TimeDelta::Micros(rtc::TimeMicros() - posted_us);
+ PostDelayedTaskOnTaskQueue(
+ std::move(task), std::max(delay - post_time, TimeDelta::Zero()));
+ });
}
}
+void TaskQueueLibevent::PostDelayedHighPrecisionTask(
+ absl::AnyInvocable<void() &&> task,
+ TimeDelta delay) {
+ PostDelayedTask(std::move(task), delay);
+}
+
// static
void TaskQueueLibevent::OnWakeup(int socket,
short flags, // NOLINT
@@ -271,19 +275,16 @@
event_base_loopbreak(me->event_base_);
break;
case kRunTasks: {
- absl::InlinedVector<std::unique_ptr<QueuedTask>, 4> tasks;
+ absl::InlinedVector<absl::AnyInvocable<void() &&>, 4> tasks;
{
MutexLock lock(&me->pending_lock_);
tasks.swap(me->pending_);
}
RTC_DCHECK(!tasks.empty());
for (auto& task : tasks) {
- if (task->Run()) {
- task.reset();
- } else {
- // `false` means the task should *not* be deleted.
- task.release();
- }
+ std::move(task)();
+ // Prefer to delete the `task` before running the next one.
+ task = nullptr;
}
break;
}
@@ -298,8 +299,7 @@
short flags, // NOLINT
void* context) {
TimerEvent* timer = static_cast<TimerEvent*>(context);
- if (!timer->task->Run())
- timer->task.release();
+ std::move(timer->task)();
timer->task_queue->pending_timers_.remove(timer);
delete timer;
}