Metronome: disable & refactor for single-threaded operation.

The Chromium implementation unfortunately has a rare deadlock.
Rather than patching that up, we're changing the metronome
implementation to be able to use a single-threaded environment
instead.

The metronome functionality is disabled in VideoReceiveStream2
construction inside call.cc.

The new design does not have listener registration or
deresigstration and instead accepts and invokes callbacks, on
the same sequence that requested the callback. This allows
the clients to use features such as WeakPtrFactories or
ScopedThreadSafety for cancellation.

The CL will be followed up with cleanup CLs that removes
registration APIs once downstream consumers have adapted.

Bug: chromium:1381982
Change-Id: I43732d1971e2276c39b431a04365cd2fc3c55c25
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/282280
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38582}
diff --git a/pc/peer_connection_factory.cc b/pc/peer_connection_factory.cc
index cec909f..5495535 100644
--- a/pc/peer_connection_factory.cc
+++ b/pc/peer_connection_factory.cc
@@ -110,6 +110,8 @@
 
 PeerConnectionFactory::~PeerConnectionFactory() {
   RTC_DCHECK_RUN_ON(signaling_thread());
+  // Ensures the metronome is destroyed on the worker thread.
+  worker_thread()->BlockingCall([metronome = std::move(metronome_)] {});
 }
 
 void PeerConnectionFactory::SetOptions(const Options& options) {
diff --git a/pc/peer_connection_factory.h b/pc/peer_connection_factory.h
index 9bed5ba..2851954 100644
--- a/pc/peer_connection_factory.h
+++ b/pc/peer_connection_factory.h
@@ -152,7 +152,7 @@
   std::unique_ptr<NetEqFactory> neteq_factory_;
   const std::unique_ptr<RtpTransportControllerSendFactoryInterface>
       transport_controller_send_factory_;
-  std::unique_ptr<Metronome> metronome_;
+  std::unique_ptr<Metronome> metronome_ RTC_GUARDED_BY(worker_thread());
 };
 
 }  // namespace webrtc
diff --git a/pc/test/integration_test_helpers.cc b/pc/test/integration_test_helpers.cc
index 3201328..0ad6cbc 100644
--- a/pc/test/integration_test_helpers.cc
+++ b/pc/test/integration_test_helpers.cc
@@ -56,41 +56,25 @@
   return -1;
 }
 
-TaskQueueMetronome::TaskQueueMetronome(TaskQueueFactory* factory,
-                                       TimeDelta tick_period)
-    : tick_period_(tick_period),
-      queue_(factory->CreateTaskQueue("MetronomeQueue",
-                                      TaskQueueFactory::Priority::HIGH)) {
-  tick_task_ = RepeatingTaskHandle::Start(queue_.Get(), [this] {
-    MutexLock lock(&mutex_);
-    for (auto* listener : listeners_) {
-      listener->OnTickTaskQueue()->PostTask([listener] { listener->OnTick(); });
-    }
-    return tick_period_;
-  });
-}
+TaskQueueMetronome::TaskQueueMetronome(TimeDelta tick_period)
+    : tick_period_(tick_period) {}
 
-TaskQueueMetronome::~TaskQueueMetronome() {
-  RTC_DCHECK(listeners_.empty());
-  rtc::Event stop_event;
-  queue_.PostTask([this, &stop_event] {
-    tick_task_.Stop();
-    stop_event.Set();
-  });
-  stop_event.Wait(TimeDelta::Seconds(1));
-}
-
-void TaskQueueMetronome::AddListener(TickListener* listener) {
-  MutexLock lock(&mutex_);
-  auto [it, inserted] = listeners_.insert(listener);
-  RTC_DCHECK(inserted);
-}
-
-void TaskQueueMetronome::RemoveListener(TickListener* listener) {
-  MutexLock lock(&mutex_);
-  auto it = listeners_.find(listener);
-  RTC_DCHECK(it != listeners_.end());
-  listeners_.erase(it);
+void TaskQueueMetronome::RequestCallOnNextTick(
+    absl::AnyInvocable<void() &&> callback) {
+  callbacks_.push_back(std::move(callback));
+  // Only schedule a tick callback for the first `callback` addition.
+  // Schedule on the current task queue to comply with RequestCallOnNextTick
+  // requirements.
+  if (callbacks_.size() == 1) {
+    TaskQueueBase::Current()->PostDelayedTask(
+        [this] {
+          std::vector<absl::AnyInvocable<void() &&>> callbacks;
+          callbacks_.swap(callbacks);
+          for (auto& callback : callbacks)
+            std::move(callback)();
+        },
+        tick_period_);
+  }
 }
 
 TimeDelta TaskQueueMetronome::TickPeriod() const {
diff --git a/pc/test/integration_test_helpers.h b/pc/test/integration_test_helpers.h
index 1a1172c..21ddacb 100644
--- a/pc/test/integration_test_helpers.h
+++ b/pc/test/integration_test_helpers.h
@@ -180,20 +180,15 @@
 
 class TaskQueueMetronome : public webrtc::Metronome {
  public:
-  TaskQueueMetronome(TaskQueueFactory* factory, TimeDelta tick_period);
-  ~TaskQueueMetronome() override;
+  explicit TaskQueueMetronome(TimeDelta tick_period);
 
   // webrtc::Metronome implementation.
-  void AddListener(TickListener* listener) override;
-  void RemoveListener(TickListener* listener) override;
+  void RequestCallOnNextTick(absl::AnyInvocable<void() &&> callback) override;
   TimeDelta TickPeriod() const override;
 
  private:
-  Mutex mutex_;
   const TimeDelta tick_period_;
-  std::set<TickListener*> listeners_ RTC_GUARDED_BY(mutex_);
-  RepeatingTaskHandle tick_task_;
-  rtc::TaskQueue queue_;
+  std::vector<absl::AnyInvocable<void() &&>> callbacks_;
 };
 
 class SignalingMessageReceiver {
@@ -775,8 +770,8 @@
     pc_factory_dependencies.task_queue_factory =
         webrtc::CreateDefaultTaskQueueFactory();
     pc_factory_dependencies.trials = std::make_unique<FieldTrialBasedConfig>();
-    pc_factory_dependencies.metronome = std::make_unique<TaskQueueMetronome>(
-        pc_factory_dependencies.task_queue_factory.get(), TimeDelta::Millis(8));
+    pc_factory_dependencies.metronome =
+        std::make_unique<TaskQueueMetronome>(TimeDelta::Millis(8));
     cricket::MediaEngineDependencies media_deps;
     media_deps.task_queue_factory =
         pc_factory_dependencies.task_queue_factory.get();