Migrate RepeatingTask to take raw pointer to TaskQueueBase instead of TaskQueue
In particular replace call rtc::TaskQueue::Current with TaskQueueBase::Current
Bug: webrtc:10191
Change-Id: I19d42a716d27f0aba087dc70ac65b4ee6249408f
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/125085
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27005}
diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn
index c0648b7..0129512 100644
--- a/api/task_queue/BUILD.gn
+++ b/api/task_queue/BUILD.gn
@@ -21,6 +21,7 @@
deps = [
"../../rtc_base:checks",
+ "../../rtc_base:macromagic",
"//third_party/abseil-cpp/absl/base:config",
"//third_party/abseil-cpp/absl/base:core_headers",
"//third_party/abseil-cpp/absl/strings",
diff --git a/api/task_queue/DEPS b/api/task_queue/DEPS
index 9cb6b1f..fab6056 100644
--- a/api/task_queue/DEPS
+++ b/api/task_queue/DEPS
@@ -1,4 +1,9 @@
specific_include_rules = {
+ "task_queue_base\.h": [
+ # Make TaskQueueBase RTC_LOCKABALE to allow annotate variables are only
+ # accessed on specific task queue.
+ "+rtc_base/thread_annotations.h",
+ ],
"task_queue_test\.h": [
"+test/gtest.h",
],
diff --git a/api/task_queue/task_queue_base.h b/api/task_queue/task_queue_base.h
index b1b5cc7..fade005 100644
--- a/api/task_queue/task_queue_base.h
+++ b/api/task_queue/task_queue_base.h
@@ -13,6 +13,7 @@
#include <memory>
#include "api/task_queue/queued_task.h"
+#include "rtc_base/thread_annotations.h"
// TODO(bugs.webrtc.org/10191): Remove when
// rtc::TaskQueue* rtc::TaskQueue::Current() is unused.
@@ -26,7 +27,7 @@
// in FIFO order and that tasks never overlap. Tasks may always execute on the
// same worker thread and they may not. To DCHECK that tasks are executing on a
// known task queue, use IsCurrent().
-class TaskQueueBase {
+class RTC_LOCKABLE TaskQueueBase {
public:
// Starts destruction of the task queue.
// On return ensures no task are running and no new tasks are able to start
diff --git a/call/rtp_transport_controller_send.cc b/call/rtp_transport_controller_send.cc
index 5bf5737..a414c3b 100644
--- a/call/rtp_transport_controller_send.cc
+++ b/call/rtp_transport_controller_send.cc
@@ -472,7 +472,7 @@
void RtpTransportControllerSend::StartProcessPeriodicTasks() {
if (!pacer_queue_update_task_.Running()) {
pacer_queue_update_task_ = RepeatingTaskHandle::DelayedStart(
- &task_queue_, kPacerQueueUpdateInterval, [this]() {
+ task_queue_.Get(), kPacerQueueUpdateInterval, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
TimeDelta expected_queue_time =
TimeDelta::ms(pacer_.ExpectedQueueTimeMs());
@@ -484,7 +484,7 @@
controller_task_.Stop();
if (process_interval_.IsFinite()) {
controller_task_ = RepeatingTaskHandle::DelayedStart(
- &task_queue_, process_interval_, [this]() {
+ task_queue_.Get(), process_interval_, [this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
UpdateControllerWithTimeInterval();
return process_interval_;
diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc
index 5d2cd6e..c6f8931 100644
--- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc
+++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc
@@ -333,7 +333,7 @@
void RtcpTransceiverImpl::SchedulePeriodicCompoundPackets(int64_t delay_ms) {
periodic_task_handle_ = RepeatingTaskHandle::DelayedStart(
- config_.task_queue, TimeDelta::ms(delay_ms), [this] {
+ config_.task_queue->Get(), TimeDelta::ms(delay_ms), [this] {
RTC_DCHECK(config_.schedule_periodic_compound_packets);
RTC_DCHECK(ready_to_send_);
SendPeriodicCompoundPacket();
diff --git a/pc/test/fake_periodic_video_source.h b/pc/test/fake_periodic_video_source.h
index 923f10a..00b2e21 100644
--- a/pc/test/fake_periodic_video_source.h
+++ b/pc/test/fake_periodic_video_source.h
@@ -50,7 +50,7 @@
frame_source_.SetRotation(config.rotation);
TimeDelta frame_interval = TimeDelta::ms(config.frame_interval_ms);
- RepeatingTaskHandle::Start(task_queue_.get(), [this, frame_interval] {
+ RepeatingTaskHandle::Start(task_queue_->Get(), [this, frame_interval] {
if (broadcaster_.wants().rotation_applied) {
broadcaster_.OnFrame(frame_source_.GetFrameRotationApplied());
} else {
diff --git a/rtc_base/task_utils/BUILD.gn b/rtc_base/task_utils/BUILD.gn
index 0ec6739..16204d1 100644
--- a/rtc_base/task_utils/BUILD.gn
+++ b/rtc_base/task_utils/BUILD.gn
@@ -14,11 +14,12 @@
"repeating_task.h",
]
deps = [
+ ":to_queued_task",
"..:logging",
- "..:rtc_task_queue",
"..:sequenced_task_checker",
"..:thread_checker",
"..:timeutils",
+ "../../api/task_queue",
"../../api/units:time_delta",
"../../api/units:timestamp",
"//third_party/abseil-cpp/absl/memory",
@@ -44,6 +45,7 @@
deps = [
":repeating_task",
"..:rtc_base_approved",
+ "..:rtc_task_queue",
"../../test:test_support",
"//third_party/abseil-cpp/absl/memory",
]
diff --git a/rtc_base/task_utils/repeating_task.cc b/rtc_base/task_utils/repeating_task.cc
index 5f366cb..8d64334 100644
--- a/rtc_base/task_utils/repeating_task.cc
+++ b/rtc_base/task_utils/repeating_task.cc
@@ -9,12 +9,14 @@
*/
#include "rtc_base/task_utils/repeating_task.h"
+
#include "rtc_base/logging.h"
+#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/time_utils.h"
namespace webrtc {
namespace webrtc_repeating_task_impl {
-RepeatingTaskBase::RepeatingTaskBase(rtc::TaskQueue* task_queue,
+RepeatingTaskBase::RepeatingTaskBase(TaskQueueBase* task_queue,
TimeDelta first_delay)
: task_queue_(task_queue),
next_run_time_(Timestamp::us(rtc::TimeMicros()) + first_delay) {}
@@ -57,10 +59,10 @@
RTC_DLOG(LS_INFO) << "Using PostStop() from the task queue running the "
"repeated task. Consider calling Stop() instead.";
}
- task_queue_->PostTask([this] {
+ task_queue_->PostTask(ToQueuedTask([this] {
RTC_DCHECK_RUN_ON(task_queue_);
Stop();
- });
+ }));
}
} // namespace webrtc_repeating_task_impl
diff --git a/rtc_base/task_utils/repeating_task.h b/rtc_base/task_utils/repeating_task.h
index c4e760a..ee6035c 100644
--- a/rtc_base/task_utils/repeating_task.h
+++ b/rtc_base/task_utils/repeating_task.h
@@ -15,10 +15,11 @@
#include <utility>
#include "absl/memory/memory.h"
+#include "api/task_queue/queued_task.h"
+#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "rtc_base/sequenced_task_checker.h"
-#include "rtc_base/task_queue.h"
#include "rtc_base/thread_checker.h"
namespace webrtc {
@@ -26,9 +27,9 @@
class RepeatingTaskHandle;
namespace webrtc_repeating_task_impl {
-class RepeatingTaskBase : public rtc::QueuedTask {
+class RepeatingTaskBase : public QueuedTask {
public:
- RepeatingTaskBase(rtc::TaskQueue* task_queue, TimeDelta first_delay);
+ RepeatingTaskBase(TaskQueueBase* task_queue, TimeDelta first_delay);
~RepeatingTaskBase() override;
virtual TimeDelta RunClosure() = 0;
@@ -39,7 +40,7 @@
void Stop() RTC_RUN_ON(task_queue_);
void PostStop();
- rtc::TaskQueue* const task_queue_;
+ TaskQueueBase* const task_queue_;
// This is always finite, except for the special case where it's PlusInfinity
// to signal that the task should stop.
Timestamp next_run_time_ RTC_GUARDED_BY(task_queue_);
@@ -49,7 +50,7 @@
template <class Closure>
class RepeatingTaskImpl final : public RepeatingTaskBase {
public:
- RepeatingTaskImpl(rtc::TaskQueue* task_queue,
+ RepeatingTaskImpl(TaskQueueBase* task_queue,
TimeDelta first_delay,
Closure&& closure)
: RepeatingTaskBase(task_queue, first_delay),
@@ -91,7 +92,7 @@
// perfectly fine to destroy the handle while the task is running, since the
// repeated task is owned by the TaskQueue.
template <class Closure>
- static RepeatingTaskHandle Start(rtc::TaskQueue* task_queue,
+ static RepeatingTaskHandle Start(TaskQueueBase* task_queue,
Closure&& closure) {
auto repeating_task = absl::make_unique<
webrtc_repeating_task_impl::RepeatingTaskImpl<Closure>>(
@@ -102,13 +103,13 @@
}
template <class Closure>
static RepeatingTaskHandle Start(Closure&& closure) {
- return Start(rtc::TaskQueue::Current(), std::forward<Closure>(closure));
+ return Start(TaskQueueBase::Current(), std::forward<Closure>(closure));
}
// DelayedStart is equivalent to Start except that the first invocation of the
// closure will be delayed by the given amount.
template <class Closure>
- static RepeatingTaskHandle DelayedStart(rtc::TaskQueue* task_queue,
+ static RepeatingTaskHandle DelayedStart(TaskQueueBase* task_queue,
TimeDelta first_delay,
Closure&& closure) {
auto repeating_task = absl::make_unique<
@@ -121,7 +122,7 @@
template <class Closure>
static RepeatingTaskHandle DelayedStart(TimeDelta first_delay,
Closure&& closure) {
- return DelayedStart(rtc::TaskQueue::Current(), first_delay,
+ return DelayedStart(TaskQueueBase::Current(), first_delay,
std::forward<Closure>(closure));
}
diff --git a/rtc_base/task_utils/repeating_task_unittest.cc b/rtc_base/task_utils/repeating_task_unittest.cc
index 52683e3..244bb8e 100644
--- a/rtc_base/task_utils/repeating_task_unittest.cc
+++ b/rtc_base/task_utils/repeating_task_unittest.cc
@@ -15,6 +15,7 @@
#include "absl/memory/memory.h"
#include "rtc_base/event.h"
+#include "rtc_base/task_queue.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "test/gmock.h"
#include "test/gtest.h"
@@ -69,7 +70,7 @@
rtc::TaskQueue task_queue("TestQueue");
std::atomic_int counter(0);
- auto handle = RepeatingTaskHandle::Start(&task_queue, [&] {
+ auto handle = RepeatingTaskHandle::Start(task_queue.Get(), [&] {
if (++counter >= kShortIntervalCount)
return kLongInterval;
return kShortInterval;
@@ -96,7 +97,7 @@
std::atomic_int counter(0);
rtc::TaskQueue task_queue("TestQueue");
- RepeatingTaskHandle::Start(&task_queue, [&] {
+ RepeatingTaskHandle::Start(task_queue.Get(), [&] {
if (++counter == kSleepAtCount)
Sleep(kSleepDuration);
return kRepeatInterval;
@@ -110,7 +111,7 @@
TEST(RepeatingTaskTest, CompensatesForShortRunTime) {
std::atomic_int counter(0);
rtc::TaskQueue task_queue("TestQueue");
- RepeatingTaskHandle::Start(&task_queue, [&] {
+ RepeatingTaskHandle::Start(task_queue.Get(), [&] {
++counter;
// Sleeping for the 10 ms should be compensated.
Sleep(TimeDelta::ms(10));
@@ -130,7 +131,7 @@
EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); }));
rtc::TaskQueue task_queue("queue");
auto handle = RepeatingTaskHandle::DelayedStart(
- &task_queue, TimeDelta::ms(100), MoveOnlyClosure(&mock));
+ task_queue.Get(), TimeDelta::ms(100), MoveOnlyClosure(&mock));
handle.PostStop();
EXPECT_TRUE(done.Wait(kTimeout.ms()));
}
@@ -141,7 +142,8 @@
EXPECT_CALL(mock, Call).WillOnce(Return(TimeDelta::ms(100)));
EXPECT_CALL(mock, Delete).WillOnce(Invoke([&done] { done.Set(); }));
rtc::TaskQueue task_queue("queue");
- auto handle = RepeatingTaskHandle::Start(&task_queue, MoveOnlyClosure(&mock));
+ auto handle =
+ RepeatingTaskHandle::Start(task_queue.Get(), MoveOnlyClosure(&mock));
handle.PostStop();
EXPECT_TRUE(done.Wait(kTimeout.ms()));
}
@@ -151,7 +153,7 @@
rtc::TaskQueue task_queue("TestQueue");
RepeatingTaskHandle handle;
task_queue.PostTask([&] {
- handle = RepeatingTaskHandle::Start(&task_queue, [&] {
+ handle = RepeatingTaskHandle::Start(task_queue.Get(), [&] {
++counter;
handle.Stop();
return TimeDelta::ms(2);
@@ -171,7 +173,7 @@
return kTimeout;
}));
rtc::TaskQueue task_queue("queue");
- RepeatingTaskHandle::Start(&task_queue, MoveOnlyClosure(&closure));
+ RepeatingTaskHandle::Start(task_queue.Get(), MoveOnlyClosure(&closure));
EXPECT_TRUE(done.Wait(kTimeout.ms()));
}
@@ -186,7 +188,7 @@
return kTimeout;
}));
rtc::TaskQueue task_queue("queue");
- RepeatingTaskHandle::Start(&task_queue, closure.AsStdFunction());
+ RepeatingTaskHandle::Start(task_queue.Get(), closure.AsStdFunction());
EXPECT_TRUE(done.Wait(kTimeout.ms()));
}
@@ -196,7 +198,7 @@
void DoPeriodicTask() {}
TimeDelta TimeUntilNextRun() { return TimeDelta::ms(100); }
void StartPeriodicTask(RepeatingTaskHandle* handle,
- rtc::TaskQueue* task_queue) {
+ TaskQueueBase* task_queue) {
*handle = RepeatingTaskHandle::Start(task_queue, [this] {
DoPeriodicTask();
return TimeUntilNextRun();
@@ -207,10 +209,10 @@
auto object = absl::make_unique<ObjectOnTaskQueue>();
// Create and start the periodic task.
RepeatingTaskHandle handle;
- object->StartPeriodicTask(&handle, &task_queue);
+ object->StartPeriodicTask(&handle, task_queue.Get());
// Restart the task
handle.PostStop();
- object->StartPeriodicTask(&handle, &task_queue);
+ object->StartPeriodicTask(&handle, task_queue.Get());
handle.PostStop();
struct Destructor {
void operator()() { object.reset(); }
diff --git a/test/frame_generator_capturer.cc b/test/frame_generator_capturer.cc
index 885ff2c9..5419d93 100644
--- a/test/frame_generator_capturer.cc
+++ b/test/frame_generator_capturer.cc
@@ -141,8 +141,8 @@
return false;
RepeatingTaskHandle::DelayedStart(
- &task_queue_, TimeDelta::seconds(1) / GetCurrentConfiguredFramerate(),
- [this] {
+ task_queue_.Get(),
+ TimeDelta::seconds(1) / GetCurrentConfiguredFramerate(), [this] {
InsertFrame();
return TimeDelta::seconds(1) / GetCurrentConfiguredFramerate();
});
diff --git a/test/scenario/network/network_emulation_manager.cc b/test/scenario/network/network_emulation_manager.cc
index 1c9f249..7558cb4 100644
--- a/test/scenario/network/network_emulation_manager.cc
+++ b/test/scenario/network/network_emulation_manager.cc
@@ -41,7 +41,7 @@
next_node_id_(1),
next_ip4_address_(kMinIPv4Address),
task_queue_("network_emulation_manager") {
- process_task_handle_ = RepeatingTaskHandle::Start(&task_queue_, [this] {
+ process_task_handle_ = RepeatingTaskHandle::Start(task_queue_.Get(), [this] {
ProcessNetworkPackets();
return TimeDelta::ms(kPacketProcessingIntervalMs);
});