Cleanup: Prepares for simulated time peer connection tests.

This CL contains some preparatory cleanup that can be done
outside the main CL.

Bug: webrtc:11255
Change-Id: Ib0dcd81d352bafc446dcd2f7f82ba81f5e82e210
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/165766
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#30247}
diff --git a/test/network/emulated_network_manager.cc b/test/network/emulated_network_manager.cc
index b4ee7d3..a3b1691 100644
--- a/test/network/emulated_network_manager.cc
+++ b/test/network/emulated_network_manager.cc
@@ -14,6 +14,7 @@
 #include <utility>
 
 #include "absl/memory/memory.h"
+#include "test/network/fake_network_socket_server.h"
 
 namespace webrtc {
 namespace test {
@@ -24,18 +25,18 @@
     EndpointsContainer* endpoints_container)
     : task_queue_(task_queue),
       endpoints_container_(endpoints_container),
-      socket_server_(endpoints_container),
-      network_thread_(&socket_server_),
+      network_thread_(std::make_unique<rtc::Thread>(
+          std::make_unique<FakeNetworkSocketServer>(endpoints_container))),
       sent_first_update_(false),
       start_count_(0) {
-  network_thread_.SetName("net_thread", nullptr);
-  network_thread_.Start();
+  network_thread_->SetName("net_thread", nullptr);
+  network_thread_->Start();
 }
 
 void EmulatedNetworkManager::EnableEndpoint(EmulatedEndpointImpl* endpoint) {
   RTC_CHECK(endpoints_container_->HasEndpoint(endpoint))
       << "No such interface: " << endpoint->GetPeerLocalAddress().ToString();
-  network_thread_.PostTask(RTC_FROM_HERE, [this, endpoint]() {
+  network_thread_->PostTask(RTC_FROM_HERE, [this, endpoint]() {
     endpoint->Enable();
     UpdateNetworksOnce();
   });
@@ -44,7 +45,7 @@
 void EmulatedNetworkManager::DisableEndpoint(EmulatedEndpointImpl* endpoint) {
   RTC_CHECK(endpoints_container_->HasEndpoint(endpoint))
       << "No such interface: " << endpoint->GetPeerLocalAddress().ToString();
-  network_thread_.PostTask(RTC_FROM_HERE, [this, endpoint]() {
+  network_thread_->PostTask(RTC_FROM_HERE, [this, endpoint]() {
     endpoint->Disable();
     UpdateNetworksOnce();
   });
@@ -53,23 +54,24 @@
 // Network manager interface. All these methods are supposed to be called from
 // the same thread.
 void EmulatedNetworkManager::StartUpdating() {
-  RTC_DCHECK_RUN_ON(&network_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_.get());
 
   if (start_count_) {
     // If network interfaces are already discovered and signal is sent,
     // we should trigger network signal immediately for the new clients
     // to start allocating ports.
     if (sent_first_update_)
-      network_thread_.PostTask(RTC_FROM_HERE,
-                               [this]() { MaybeSignalNetworksChanged(); });
+      network_thread_->PostTask(RTC_FROM_HERE,
+                                [this]() { MaybeSignalNetworksChanged(); });
   } else {
-    network_thread_.PostTask(RTC_FROM_HERE, [this]() { UpdateNetworksOnce(); });
+    network_thread_->PostTask(RTC_FROM_HERE,
+                              [this]() { UpdateNetworksOnce(); });
   }
   ++start_count_;
 }
 
 void EmulatedNetworkManager::StopUpdating() {
-  RTC_DCHECK_RUN_ON(&network_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_.get());
   if (!start_count_)
     return;
 
@@ -87,7 +89,7 @@
 }
 
 void EmulatedNetworkManager::UpdateNetworksOnce() {
-  RTC_DCHECK_RUN_ON(&network_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_.get());
 
   std::vector<rtc::Network*> networks;
   for (std::unique_ptr<rtc::Network>& net :
@@ -105,7 +107,7 @@
 }
 
 void EmulatedNetworkManager::MaybeSignalNetworksChanged() {
-  RTC_DCHECK_RUN_ON(&network_thread_);
+  RTC_DCHECK_RUN_ON(network_thread_.get());
   // If manager is stopped we don't need to signal anything.
   if (start_count_ == 0) {
     return;
diff --git a/test/network/emulated_network_manager.h b/test/network/emulated_network_manager.h
index bd04987..951ed91 100644
--- a/test/network/emulated_network_manager.h
+++ b/test/network/emulated_network_manager.h
@@ -21,7 +21,6 @@
 #include "rtc_base/socket_server.h"
 #include "rtc_base/thread.h"
 #include "rtc_base/thread_checker.h"
-#include "test/network/fake_network_socket_server.h"
 #include "test/network/network_emulation.h"
 
 namespace webrtc {
@@ -48,7 +47,7 @@
   void GetAnyAddressNetworks(NetworkList* networks) override {}
 
   // EmulatedNetworkManagerInterface API
-  rtc::Thread* network_thread() override { return &network_thread_; }
+  rtc::Thread* network_thread() override { return network_thread_.get(); }
   rtc::NetworkManager* network_manager() override { return this; }
   void GetStats(
       std::function<void(EmulatedNetworkStats)> stats_callback) const override;
@@ -59,8 +58,7 @@
 
   TaskQueueForTest* const task_queue_;
   EndpointsContainer* const endpoints_container_;
-  FakeNetworkSocketServer socket_server_;
-  rtc::Thread network_thread_;
+  std::unique_ptr<rtc::Thread> network_thread_;
 
   bool sent_first_update_ RTC_GUARDED_BY(network_thread_);
   int start_count_ RTC_GUARDED_BY(network_thread_);
diff --git a/test/peer_scenario/peer_scenario.cc b/test/peer_scenario/peer_scenario.cc
index ddc4b5b..098971c 100644
--- a/test/peer_scenario/peer_scenario.cc
+++ b/test/peer_scenario/peer_scenario.cc
@@ -84,9 +84,9 @@
   net()->CreateRoute(callee->endpoint(), ret_link, caller->endpoint());
   auto signaling = ConnectSignaling(caller, callee, send_link, ret_link);
   signaling.StartIceSignaling();
-  rtc::Event done;
+  std::atomic<bool> done(false);
   signaling.NegotiateSdp(
-      [&](const SessionDescriptionInterface&) { done.Set(); });
+      [&](const SessionDescriptionInterface&) { done = true; });
   RTC_CHECK(WaitAndProcess(&done));
 }
 
@@ -99,13 +99,15 @@
   receiver->AddVideoReceiveSink(send_track->id(), &pair->decode_tap_);
 }
 
-bool PeerScenario::WaitAndProcess(rtc::Event* event, TimeDelta max_duration) {
-  constexpr int kStepMs = 5;
-  if (event->Wait(0))
+bool PeerScenario::WaitAndProcess(std::atomic<bool>* event,
+                                  TimeDelta max_duration) {
+  const auto kStep = TimeDelta::ms(5);
+  if (*event)
     return true;
-  for (int elapsed = 0; elapsed < max_duration.ms(); elapsed += kStepMs) {
-    thread()->ProcessMessages(kStepMs);
-    if (event->Wait(0))
+  for (auto elapsed = TimeDelta::Zero(); elapsed < max_duration;
+       elapsed += kStep) {
+    thread()->ProcessMessages(kStep.ms());
+    if (*event)
       return true;
   }
   return false;
diff --git a/test/peer_scenario/peer_scenario.h b/test/peer_scenario/peer_scenario.h
index 8040f5d..4a1759a 100644
--- a/test/peer_scenario/peer_scenario.h
+++ b/test/peer_scenario/peer_scenario.h
@@ -84,7 +84,7 @@
                                   PeerScenarioClient* receiver);
 
   // Waits on |event| while processing messages on the signaling thread.
-  bool WaitAndProcess(rtc::Event* event,
+  bool WaitAndProcess(std::atomic<bool>* event,
                       TimeDelta max_duration = TimeDelta::seconds(5));
 
   // Process messages on the signaling thread for the given duration.
diff --git a/test/peer_scenario/peer_scenario_client.h b/test/peer_scenario/peer_scenario_client.h
index 7517304..61a7741 100644
--- a/test/peer_scenario/peer_scenario_client.h
+++ b/test/peer_scenario/peer_scenario_client.h
@@ -147,12 +147,12 @@
 
  private:
   const std::map<int, EmulatedEndpoint*> endpoints_;
+  TaskQueueFactory* task_queue_factory_;
   rtc::Thread* const signaling_thread_;
   const std::unique_ptr<LogWriterFactoryInterface> log_writer_factory_;
   const std::unique_ptr<rtc::Thread> worker_thread_;
   CallbackHandlers handlers_ RTC_GUARDED_BY(signaling_thread_);
   const std::unique_ptr<PeerConnectionObserver> observer_;
-  TaskQueueFactory* task_queue_factory_;
   std::map<std::string, std::vector<rtc::VideoSinkInterface<VideoFrame>*>>
       track_id_to_video_sinks_ RTC_GUARDED_BY(signaling_thread_);
   std::list<std::unique_ptr<IceCandidateInterface>> pending_ice_candidates_
diff --git a/test/peer_scenario/tests/peer_scenario_quality_test.cc b/test/peer_scenario/tests/peer_scenario_quality_test.cc
index 11aab07..16ba707 100644
--- a/test/peer_scenario/tests/peer_scenario_quality_test.cc
+++ b/test/peer_scenario/tests/peer_scenario_quality_test.cc
@@ -15,20 +15,22 @@
 namespace test {
 
 TEST(PeerScenarioQualityTest, PsnrIsCollected) {
-  VideoQualityAnalyzerConfig analyzer_config;
-  analyzer_config.thread = rtc::Thread::Current();
-  VideoQualityAnalyzer analyzer(analyzer_config);
-  PeerScenario s(*test_info_);
-  auto caller = s.CreateClient(PeerScenarioClient::Config());
-  auto callee = s.CreateClient(PeerScenarioClient::Config());
-  PeerScenarioClient::VideoSendTrackConfig video_conf;
-  video_conf.generator.squares_video->framerate = 20;
-  auto video = caller->CreateVideo("VIDEO", video_conf);
-  auto link_builder = s.net()->NodeBuilder().delay_ms(100).capacity_kbps(600);
-  s.AttachVideoQualityAnalyzer(&analyzer, video.track, callee);
-  s.SimpleConnection(caller, callee, {link_builder.Build().node},
-                     {link_builder.Build().node});
-  s.ProcessMessages(TimeDelta::seconds(2));
+  VideoQualityAnalyzer analyzer;
+  {
+    PeerScenario s(*test_info_);
+    auto caller = s.CreateClient(PeerScenarioClient::Config());
+    auto callee = s.CreateClient(PeerScenarioClient::Config());
+    PeerScenarioClient::VideoSendTrackConfig video_conf;
+    video_conf.generator.squares_video->framerate = 20;
+    auto video = caller->CreateVideo("VIDEO", video_conf);
+    auto link_builder = s.net()->NodeBuilder().delay_ms(100).capacity_kbps(600);
+    s.AttachVideoQualityAnalyzer(&analyzer, video.track, callee);
+    s.SimpleConnection(caller, callee, {link_builder.Build().node},
+                       {link_builder.Build().node});
+    s.ProcessMessages(TimeDelta::seconds(2));
+    // Exit scope to ensure that there's no pending tasks reporting to analyzer.
+  }
+
   // We expect ca 40 frames to be produced, but to avoid flakiness on slow
   // machines we only test for 10.
   EXPECT_GT(analyzer.stats().render.count, 10);
diff --git a/test/peer_scenario/tests/remote_estimate_test.cc b/test/peer_scenario/tests/remote_estimate_test.cc
index 75f41b6..b882ad9 100644
--- a/test/peer_scenario/tests/remote_estimate_test.cc
+++ b/test/peer_scenario/tests/remote_estimate_test.cc
@@ -54,7 +54,7 @@
 
   auto signaling = s.ConnectSignaling(caller, callee, send_link, ret_link);
   caller->CreateVideo("VIDEO", PeerScenarioClient::VideoSendTrackConfig());
-  rtc::Event offer_exchange_done;
+  std::atomic<bool> offer_exchange_done(false);
   signaling.NegotiateSdp(
       [](SessionDescriptionInterface* offer) {
         for (auto& cont : offer->description()->contents()) {
@@ -65,14 +65,14 @@
         for (auto& cont : answer.description()->contents()) {
           EXPECT_TRUE(cont.media_description()->remote_estimate());
         }
-        offer_exchange_done.Set();
+        offer_exchange_done = true;
       });
   RTC_CHECK(s.WaitAndProcess(&offer_exchange_done));
 }
 
 TEST(RemoteEstimateEndToEnd, AudioUsesAbsSendTimeExtension) {
   // Defined before PeerScenario so it gets destructed after, to avoid use after free.
-  rtc::Event received_abs_send_time;
+  std::atomic<bool> received_abs_send_time(false);
   PeerScenario s(*test_info_);
 
   auto* caller = s.CreateClient(PeerScenarioClient::Config());
@@ -88,7 +88,7 @@
   caller->CreateAudio("AUDIO", cricket::AudioOptions());
   signaling.StartIceSignaling();
   RtpHeaderExtensionMap extension_map;
-  rtc::Event offer_exchange_done;
+  std::atomic<bool> offer_exchange_done(false);
   signaling.NegotiateSdp(
       [&extension_map](SessionDescriptionInterface* offer) {
         extension_map = AudioExtensions(*offer);
@@ -97,7 +97,7 @@
       [&](const SessionDescriptionInterface& answer) {
         EXPECT_TRUE(AudioExtensions(answer).IsRegistered(
             kRtpExtensionAbsoluteSendTime));
-        offer_exchange_done.Set();
+        offer_exchange_done = true;
       });
   RTC_CHECK(s.WaitAndProcess(&offer_exchange_done));
   send_node->router()->SetWatcher(
@@ -110,7 +110,7 @@
           auto extensions = GetRtpPacketExtensions(packet.data, extension_map);
           if (extensions) {
             EXPECT_TRUE(extensions->hasAbsoluteSendTime);
-            received_abs_send_time.Set();
+            received_abs_send_time = true;
           }
         }
       });