Enhance ScopedOperationsBatcher to support tasks returning tasks. A common pattern for negotiation operations that involve the signaling thread as well as one or both of the worker/network threads, is to a) apply partial state on the signaling thread, b) perform work on the worker/network thread(s), c) run completion work on the signaling thread that depends on the outcome of the previous step. This adds support for that pattern to the ScopedOperationsBatcher so that this pattern can be applied to batched operations. Bug: webrtc:42222804 Change-Id: I776786216208991c1716a1ab2808c9b37f30ead8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/449362 Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org> Cr-Commit-Position: refs/heads/main@{#47195}
diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 6048d98..24abb0d 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc
@@ -112,6 +112,15 @@ namespace webrtc { namespace { +// Batches operations to be executed synchronously on the worker thread. +// +// When the batcher goes out of scope (or `Run()` is explicitly called), it +// executes all queued tasks in a single `BlockingCall` to the worker thread. +// +// Tasks can either have a `void` return type, or return a new task. +// Any tasks returned by the executed worker thread tasks are collected +// and subsequently executed on the calling thread (typically the signaling +// thread) after the worker thread operations have completed. class ScopedOperationsBatcher { public: explicit ScopedOperationsBatcher(Thread* worker_thread) @@ -122,13 +131,26 @@ ~ScopedOperationsBatcher() { Run(); } void Run() { + std::vector<absl::AnyInvocable<void() &&>> signaling_tasks; if (!tasks_.empty()) { - worker_thread_->BlockingCall([tasks = std::move(tasks_)]() mutable { - for (auto& task : tasks) { - std::move(task)(); + worker_thread_->BlockingCall([&] { + for (auto& task : tasks_) { + if (task.void_task) { + std::move(task.void_task)(); + } else { + RTC_DCHECK(task.returning_task); + auto ret = std::move(task.returning_task)(); + if (ret) { + signaling_tasks.push_back(std::move(ret)); + } + } } }); - RTC_DCHECK(tasks_.empty()); + tasks_.clear(); + } + + for (auto& task : signaling_tasks) { + std::move(task)(); } } @@ -136,13 +158,24 @@ // ScopedOperationsBatcher goes out of scope. void push_back(absl::AnyInvocable<void() &&> task) { if (task) { - tasks_.push_back(std::move(task)); + tasks_.push_back({.void_task = std::move(task)}); + } + } + + void push_back(absl::AnyInvocable<absl::AnyInvocable<void() &&>() &&> task) { + if (task) { + tasks_.push_back({.returning_task = std::move(task)}); } } private: + struct BatchedTask { + absl::AnyInvocable<void() &&> void_task; + absl::AnyInvocable<absl::AnyInvocable<void() &&>() &&> returning_task; + }; + Thread* const worker_thread_; - std::vector<absl::AnyInvocable<void() &&>> tasks_; + std::vector<BatchedTask> tasks_; }; struct DtlsTransportAndName {