Update ScopedOperationsBatcher tasks with finalizers to propagate errors The Run() method now returns RTCError instead of void. Tasks with a finalizer, now return RTCError. Upon encountering an error while running batched tasks on the target thread, the batcher will abort and return. This is to align with how these operations are today applied in a non batched manner. Already queued up finalizers will be invoked. Adding new type declarations for the callbacks for improved readability: SimpleBatchTask, FinalizerTask and BatchTaskWithFinalizer. Bug: webrtc:42222804 Change-Id: I03150abcdcc4fed41b18de5033fdb256f5802822 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/460540 Reviewed-by: Danil Chapovalov <danilchap@webrtc.org> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org> Cr-Commit-Position: refs/heads/main@{#47288}
diff --git a/pc/BUILD.gn b/pc/BUILD.gn index a903e35..5db70e7 100644 --- a/pc/BUILD.gn +++ b/pc/BUILD.gn
@@ -1088,8 +1088,10 @@ "scoped_operations_batcher.h", ] deps = [ + "../api:rtc_error", "../api:sequence_checker", "../rtc_base:checks", + "../rtc_base:logging", "../rtc_base:threading", "../rtc_base/system:no_unique_address", "//third_party/abseil-cpp/absl/functional:any_invocable",
diff --git a/pc/scoped_operations_batcher.cc b/pc/scoped_operations_batcher.cc index eea4157..ad6ccc5 100644 --- a/pc/scoped_operations_batcher.cc +++ b/pc/scoped_operations_batcher.cc
@@ -16,8 +16,10 @@ #include <vector> #include "absl/functional/any_invocable.h" +#include "api/rtc_error.h" #include "api/sequence_checker.h" #include "rtc_base/checks.h" +#include "rtc_base/logging.h" #include "rtc_base/thread.h" namespace webrtc { @@ -28,57 +30,69 @@ } ScopedOperationsBatcher::~ScopedOperationsBatcher() { - Run(); + RTCError error = Run(); + if (!error.ok()) { + RTC_LOG(LS_ERROR) << "Batcher failed: " << error.message(); + } } -void ScopedOperationsBatcher::Run() { +RTCError ScopedOperationsBatcher::Run() { RTC_DCHECK_RUN_ON(&sequence_checker_); - std::vector<absl::AnyInvocable<void() &&>> return_tasks; + std::vector<FinalizerTask> return_tasks; size_t task_idx = 0; bool target_thread_is_current = target_thread_->IsCurrent(); + RTCError error = RTCError::OK(); while (task_idx < tasks_.size()) { target_thread_->BlockingCall([&] { while (task_idx < tasks_.size()) { - if (auto* void_task = - std::get_if<absl::AnyInvocable<void() &&>>(&tasks_[task_idx])) { + if (auto* void_task = std::get_if<SimpleBatchTask>(&tasks_[task_idx])) { std::move (*void_task)(); } else { - auto* returning_task = std::get_if< - absl::AnyInvocable<absl::AnyInvocable<void() &&>() &&>>( - &tasks_[task_idx]); + auto* returning_task = + std::get_if<BatchTaskWithFinalizer>(&tasks_[task_idx]); RTC_DCHECK(returning_task); auto ret = std::move(*returning_task)(); - if (ret) { - return_tasks.push_back(std::move(ret)); + if (ret.ok()) { + if (ret.value()) { + return_tasks.push_back(std::move(ret.value())); + } + } else { + error = ret.MoveError(); } } ++task_idx; + if (!error.ok()) { + return; + } if (!target_thread_is_current && target_thread_->HasPendingTasks()) { return; } } }); + if (!error.ok()) { + break; + } } - RTC_DCHECK_EQ(task_idx, tasks_.size()); tasks_.clear(); for (auto& task : return_tasks) { std::move(task)(); } + + return error; } -void ScopedOperationsBatcher::Add(absl::AnyInvocable<void() &&> task) { +void ScopedOperationsBatcher::Add(SimpleBatchTask task) { RTC_DCHECK_RUN_ON(&sequence_checker_); if (task) { tasks_.emplace_back(std::move(task)); } } -void ScopedOperationsBatcher::AddWithFinalizer( - absl::AnyInvocable<absl::AnyInvocable<void() &&>() &&> task) { +void ScopedOperationsBatcher::AddWithFinalizer(BatchTaskWithFinalizer task) { RTC_DCHECK_RUN_ON(&sequence_checker_); if (task) { tasks_.emplace_back(std::move(task));
diff --git a/pc/scoped_operations_batcher.h b/pc/scoped_operations_batcher.h index 6a5f905..2c481d4 100644 --- a/pc/scoped_operations_batcher.h +++ b/pc/scoped_operations_batcher.h
@@ -15,6 +15,7 @@ #include <vector> #include "absl/functional/any_invocable.h" +#include "api/rtc_error.h" #include "api/sequence_checker.h" #include "rtc_base/system/no_unique_address.h" #include "rtc_base/thread.h" @@ -31,27 +32,30 @@ // batcher will cooperatively yield the thread and resume execution in a // subsequent `BlockingCall`. // -// Tasks can either have a `void` return type, or return a new task. +// Tasks can either have a `void` return type, or return a new task or return an +// error. // Any tasks returned by the executed worker thread tasks are collected // and subsequently executed on the calling thread (typically the signaling // thread) after the worker thread operations have completed. class ScopedOperationsBatcher { public: + using SimpleBatchTask = absl::AnyInvocable<void() &&>; + using FinalizerTask = absl::AnyInvocable<void() &&>; + using BatchTaskWithFinalizer = + absl::AnyInvocable<RTCErrorOr<FinalizerTask>() &&>; + explicit ScopedOperationsBatcher(Thread* target_thread); ~ScopedOperationsBatcher(); - void Run(); + RTCError Run(); // Queues non-nullptr tasks to be executed on the target thread when the // ScopedOperationsBatcher goes out of scope. - void Add(absl::AnyInvocable<void() &&> task); - void AddWithFinalizer( - absl::AnyInvocable<absl::AnyInvocable<void() &&>() &&> task); + void Add(SimpleBatchTask task); + void AddWithFinalizer(BatchTaskWithFinalizer task); private: - using BatchedTask = - std::variant<absl::AnyInvocable<void() &&>, - absl::AnyInvocable<absl::AnyInvocable<void() &&>() &&>>; + using BatchedTask = std::variant<SimpleBatchTask, BatchTaskWithFinalizer>; RTC_NO_UNIQUE_ADDRESS SequenceChecker sequence_checker_; Thread* const target_thread_;
diff --git a/pc/scoped_operations_batcher_unittest.cc b/pc/scoped_operations_batcher_unittest.cc index d8699967..afe36b7 100644 --- a/pc/scoped_operations_batcher_unittest.cc +++ b/pc/scoped_operations_batcher_unittest.cc
@@ -15,6 +15,7 @@ #include <vector> #include "absl/functional/any_invocable.h" +#include "api/rtc_error.h" #include "rtc_base/thread.h" #include "test/gtest.h" @@ -57,14 +58,14 @@ { ScopedOperationsBatcher batcher(target_thread.get()); - absl::AnyInvocable<absl::AnyInvocable<void() &&>() &&> task = - [&]() -> absl::AnyInvocable<void() &&> { + ScopedOperationsBatcher::BatchTaskWithFinalizer task = + [&]() -> RTCErrorOr<ScopedOperationsBatcher::FinalizerTask> { task_executed = true; task_thread = Thread::Current(); - return [&]() { + return ScopedOperationsBatcher::FinalizerTask([&]() { return_task_executed = true; return_task_thread = Thread::Current(); - }; + }); }; batcher.AddWithFinalizer(std::move(task)); } @@ -100,5 +101,54 @@ EXPECT_EQ(execution_order, std::vector<int>({1, 2, 3, 4, 5})); } +TEST(ScopedOperationsBatcherTest, StopsExecutionOnError) { + auto target_thread = Thread::Create(); + target_thread->Start(); + + bool task1_executed = false; + bool task2_executed = false; + bool task3_executed = false; + // There's no finalizer for task2. + bool finalizer1_executed = false; + bool finalizer3_executed = false; + + RTCError error = RTCError::OK(); + { + ScopedOperationsBatcher batcher(target_thread.get()); + // Task 1. + batcher.AddWithFinalizer( + [&]() -> RTCErrorOr<ScopedOperationsBatcher::FinalizerTask> { + task1_executed = true; + return ScopedOperationsBatcher::FinalizerTask( + [&] { finalizer1_executed = true; }); + }); + // Task 2. + batcher.AddWithFinalizer( + [&]() -> RTCErrorOr<ScopedOperationsBatcher::FinalizerTask> { + task2_executed = true; + return RTCError(RTCErrorType::INVALID_STATE, "Failed"); + }); + // Task 3. + batcher.AddWithFinalizer( + [&]() -> RTCErrorOr<ScopedOperationsBatcher::FinalizerTask> { + task3_executed = true; + return ScopedOperationsBatcher::FinalizerTask( + [&] { finalizer3_executed = true; }); + }); + + error = batcher.Run(); + } + + EXPECT_FALSE(error.ok()); + EXPECT_EQ(error.type(), RTCErrorType::INVALID_STATE); + EXPECT_STREQ(error.message(), "Failed"); + + EXPECT_TRUE(task1_executed); + EXPECT_TRUE(finalizer1_executed); + EXPECT_TRUE(task2_executed); + EXPECT_FALSE(task3_executed); + EXPECT_FALSE(finalizer3_executed); +} + } // namespace } // namespace webrtc