Reland "Using simulated rtc::Thread for peer connection scenario tests." This is a reland of b70c5c5ce97e7dcf2e1d8453f5ea0639d4b60453 Original change's description: > Using simulated rtc::Thread for peer connection scenario tests. > > Bug: webrtc:11255 > Change-Id: I5d29e997a7209ffc64595082358cca9b2115d07a > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/165689 > Commit-Queue: Sebastian Jansson <srte@webrtc.org> > Reviewed-by: Steve Anton <steveanton@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#30258} Bug: webrtc:11255 Change-Id: If65cd56b59158cebec5609407a721fbdb47cfd1b Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/166046 Reviewed-by: Steve Anton <steveanton@webrtc.org> Commit-Queue: Sebastian Jansson <srte@webrtc.org> Cr-Commit-Position: refs/heads/master@{#30294}
diff --git a/api/DEPS b/api/DEPS index a86b42f..bac4232 100644 --- a/api/DEPS +++ b/api/DEPS
@@ -265,6 +265,10 @@ "+rtc_base/thread_checker.h", ], + "time_controller\.h": [ + "+rtc_base/thread.h", + ], + "videocodec_test_fixture\.h": [ "+modules/video_coding/include/video_codec_interface.h" ],
diff --git a/api/test/time_controller.h b/api/test/time_controller.h index 70aabda..6c47e91 100644 --- a/api/test/time_controller.h +++ b/api/test/time_controller.h
@@ -12,12 +12,14 @@ #include <functional> #include <memory> +#include <string> #include "api/task_queue/task_queue_factory.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" #include "modules/utility/include/process_thread.h" #include "rtc_base/synchronization/yield_policy.h" +#include "rtc_base/thread.h" #include "system_wrappers/include/clock.h" namespace webrtc { @@ -37,6 +39,15 @@ // Creates a process thread. virtual std::unique_ptr<ProcessThread> CreateProcessThread( const char* thread_name) = 0; + // Creates an rtc::Thread instance. If |socket_server| is nullptr, a default + // noop socket server is created. + virtual std::unique_ptr<rtc::Thread> CreateThread( + const std::string& name, + std::unique_ptr<rtc::SocketServer> socket_server = nullptr) = 0; + + // Creates an rtc::Thread instance that ensure that it's set as the current + // thread. + virtual rtc::Thread* GetMainThread() = 0; // Allow task queues and process threads created by this instance to execute // for the given |duration|. virtual void AdvanceTime(TimeDelta duration) = 0;
diff --git a/test/network/emulated_network_manager.cc b/test/network/emulated_network_manager.cc index a3b1691..2dc2fad 100644 --- a/test/network/emulated_network_manager.cc +++ b/test/network/emulated_network_manager.cc
@@ -20,18 +20,16 @@ namespace test { EmulatedNetworkManager::EmulatedNetworkManager( - Clock* clock, + TimeController* time_controller, TaskQueueForTest* task_queue, EndpointsContainer* endpoints_container) : task_queue_(task_queue), endpoints_container_(endpoints_container), - network_thread_(std::make_unique<rtc::Thread>( + network_thread_(time_controller->CreateThread( + "net_thread", std::make_unique<FakeNetworkSocketServer>(endpoints_container))), sent_first_update_(false), - start_count_(0) { - network_thread_->SetName("net_thread", nullptr); - network_thread_->Start(); -} + start_count_(0) {} void EmulatedNetworkManager::EnableEndpoint(EmulatedEndpointImpl* endpoint) { RTC_CHECK(endpoints_container_->HasEndpoint(endpoint))
diff --git a/test/network/emulated_network_manager.h b/test/network/emulated_network_manager.h index 951ed91..92555ee 100644 --- a/test/network/emulated_network_manager.h +++ b/test/network/emulated_network_manager.h
@@ -15,6 +15,7 @@ #include <vector> #include "api/test/network_emulation_manager.h" +#include "api/test/time_controller.h" #include "rtc_base/critical_section.h" #include "rtc_base/ip_address.h" #include "rtc_base/network.h" @@ -31,7 +32,7 @@ public sigslot::has_slots<>, public EmulatedNetworkManagerInterface { public: - EmulatedNetworkManager(Clock* clock, + EmulatedNetworkManager(TimeController* time_controller, TaskQueueForTest* task_queue, EndpointsContainer* endpoints_container);
diff --git a/test/network/network_emulation_manager.cc b/test/network/network_emulation_manager.cc index 77ac460..494e885 100644 --- a/test/network/network_emulation_manager.cc +++ b/test/network/network_emulation_manager.cc
@@ -34,7 +34,8 @@ NetworkEmulationManagerImpl::NetworkEmulationManagerImpl( TimeController* time_controller) - : clock_(time_controller->GetClock()), + : time_controller_(time_controller), + clock_(time_controller->GetClock()), next_node_id_(1), next_ip4_address_(kMinIPv4Address), task_queue_(time_controller->GetTaskQueueFactory()->CreateTaskQueue( @@ -265,7 +266,7 @@ auto endpoints_container = std::make_unique<EndpointsContainer>(endpoint_impls); auto network_manager = std::make_unique<EmulatedNetworkManager>( - clock_, &task_queue_, endpoints_container.get()); + time_controller_, &task_queue_, endpoints_container.get()); for (auto* endpoint : endpoints) { // Associate endpoint with network manager. bool insertion_result =
diff --git a/test/network/network_emulation_manager.h b/test/network/network_emulation_manager.h index d640ea2..25c8050 100644 --- a/test/network/network_emulation_manager.h +++ b/test/network/network_emulation_manager.h
@@ -88,6 +88,7 @@ absl::optional<rtc::IPAddress> GetNextIPv4Address(); Timestamp Now() const; + TimeController* const time_controller_; Clock* const clock_; int next_node_id_;
diff --git a/test/peer_scenario/BUILD.gn b/test/peer_scenario/BUILD.gn index a4e2c79..9faff92 100644 --- a/test/peer_scenario/BUILD.gn +++ b/test/peer_scenario/BUILD.gn
@@ -30,12 +30,14 @@ "../../api:libjingle_peerconnection_api", "../../api:network_emulation_manager_api", "../../api:rtc_stats_api", + "../../api:time_controller", "../../api/audio_codecs:builtin_audio_decoder_factory", "../../api/audio_codecs:builtin_audio_encoder_factory", "../../api/rtc_event_log:rtc_event_log_factory", "../../api/task_queue:default_task_queue_factory", "../../api/video_codecs:builtin_video_decoder_factory", "../../api/video_codecs:builtin_video_encoder_factory", + "../../call:call_interfaces", "../../media:rtc_audio_video", "../../media:rtc_media_base", "../../modules/audio_device:audio_device_impl", @@ -43,10 +45,12 @@ "../../p2p:rtc_p2p", "../../pc:pc_test_utils", "../../pc:rtc_pc_base", + "../../rtc_base", "../../rtc_base:stringutils", "..//network:emulated_network", "../logging:log_writer", "../scenario", + "../time_controller", "//third_party/abseil-cpp/absl/flags:flag", "//third_party/abseil-cpp/absl/memory", ]
diff --git a/test/peer_scenario/peer_scenario.cc b/test/peer_scenario/peer_scenario.cc index 098971c..99d97c7 100644 --- a/test/peer_scenario/peer_scenario.cc +++ b/test/peer_scenario/peer_scenario.cc
@@ -11,10 +11,13 @@ #include "absl/flags/flag.h" #include "absl/memory/memory.h" +#include "rtc_base/null_socket_server.h" #include "rtc_base/string_encode.h" #include "rtc_base/strings/string_builder.h" #include "test/logging/file_log_writer.h" #include "test/testsupport/file_utils.h" +#include "test/time_controller/real_time_controller.h" +#include "test/time_controller/simulated_time_controller.h" ABSL_FLAG(bool, peer_logs, false, "Save logs from peer scenario framework."); ABSL_FLAG(std::string, @@ -38,19 +41,34 @@ } return nullptr; } + +std::unique_ptr<TimeController> CreateTimeController(bool real_time) { + if (real_time) { + return std::make_unique<RealTimeController>(); + } else { + // Using an offset of 100000 to get nice fixed width and readable timestamps + // in typical test scenarios. + const Timestamp kSimulatedStartTime = Timestamp::seconds(100000); + return std::make_unique<GlobalSimulatedTimeController>(kSimulatedStartTime); + } +} } // namespace -PeerScenario::PeerScenario(const testing::TestInfo& test_info) - : PeerScenario(std::string(test_info.test_suite_name()) + "/" + - test_info.name()) {} +PeerScenario::PeerScenario(const testing::TestInfo& test_info, bool real_time) + : PeerScenario( + std::string(test_info.test_suite_name()) + "/" + test_info.name(), + real_time) {} -PeerScenario::PeerScenario(std::string file_name) - : PeerScenario(GetPeerScenarioLogManager(file_name)) {} +PeerScenario::PeerScenario(std::string file_name, bool real_time) + : PeerScenario(GetPeerScenarioLogManager(file_name), real_time) {} PeerScenario::PeerScenario( - std::unique_ptr<LogWriterFactoryInterface> log_writer_manager) - : signaling_thread_(rtc::Thread::Current()), - log_writer_manager_(std::move(log_writer_manager)) {} + std::unique_ptr<LogWriterFactoryInterface> log_writer_manager, + bool real_time) + : log_writer_manager_(std::move(log_writer_manager)), + time_controller_(CreateTimeController(real_time)), + signaling_thread_(time_controller_->GetMainThread()), + net_(time_controller_.get()) {} PeerScenarioClient* PeerScenario::CreateClient( PeerScenarioClient::Config config) { @@ -61,8 +79,8 @@ PeerScenarioClient* PeerScenario::CreateClient( std::string name, PeerScenarioClient::Config config) { - peer_clients_.emplace_back(net(), thread(), GetLogWriterFactory(name), - config); + peer_clients_.emplace_back(net(), time_controller_.get(), thread(), + GetLogWriterFactory(name), config); return &peer_clients_.back(); } @@ -106,7 +124,7 @@ return true; for (auto elapsed = TimeDelta::Zero(); elapsed < max_duration; elapsed += kStep) { - thread()->ProcessMessages(kStep.ms()); + time_controller_->AdvanceTime(kStep); if (*event) return true; } @@ -114,7 +132,7 @@ } void PeerScenario::ProcessMessages(TimeDelta duration) { - thread()->ProcessMessages(duration.ms()); + time_controller_->AdvanceTime(duration); } std::unique_ptr<LogWriterFactoryInterface> PeerScenario::GetLogWriterFactory(
diff --git a/test/peer_scenario/peer_scenario.h b/test/peer_scenario/peer_scenario.h index 4a1759a..94dd442 100644 --- a/test/peer_scenario/peer_scenario.h +++ b/test/peer_scenario/peer_scenario.h
@@ -21,6 +21,7 @@ #include <list> #include <vector> +#include "api/test/time_controller.h" #include "test/gtest.h" #include "test/logging/log_writer.h" #include "test/network/network_emulation_manager.h" @@ -31,7 +32,6 @@ namespace webrtc { namespace test { - // The PeerScenario class represents a PeerConnection simulation scenario. The // main purpose is to maintain ownership and ensure safe destruction order of // clients and network emulation. Additionally it reduces the amount of boiler @@ -46,10 +46,12 @@ // The name is used for log output when those are enabled by the --peer_logs // command line flag. Optionally, the TestInfo struct available in gtest can // be used to automatically generate a path based on the test name. - explicit PeerScenario(const testing::TestInfo& test_info); - explicit PeerScenario(std::string file_name); + explicit PeerScenario(const testing::TestInfo& test_info, + bool real_time = false); + explicit PeerScenario(std::string file_name, bool real_time = false); explicit PeerScenario( - std::unique_ptr<LogWriterFactoryInterface> log_writer_manager); + std::unique_ptr<LogWriterFactoryInterface> log_writer_manager, + bool real_time = false); NetworkEmulationManagerImpl* net() { return &net_; } rtc::Thread* thread() { return signaling_thread_; } @@ -102,13 +104,15 @@ CapturedFrameTap capture_tap_; DecodedFrameTap decode_tap_; }; + Clock* clock() { return Clock::GetRealTimeClock(); } std::unique_ptr<LogWriterFactoryInterface> GetLogWriterFactory( std::string name); - rtc::Thread* const signaling_thread_; const std::unique_ptr<LogWriterFactoryInterface> log_writer_manager_; + const std::unique_ptr<TimeController> time_controller_; + rtc::Thread* const signaling_thread_; std::list<PeerVideoQualityPair> video_quality_pairs_; NetworkEmulationManagerImpl net_; std::list<PeerScenarioClient> peer_clients_;
diff --git a/test/peer_scenario/peer_scenario_client.cc b/test/peer_scenario/peer_scenario_client.cc index 782cd21..0ee709e 100644 --- a/test/peer_scenario/peer_scenario_client.cc +++ b/test/peer_scenario/peer_scenario_client.cc
@@ -19,6 +19,7 @@ #include "api/task_queue/default_task_queue_factory.h" #include "api/video_codecs/builtin_video_decoder_factory.h" #include "api/video_codecs/builtin_video_encoder_factory.h" +#include "call/call.h" #include "media/engine/webrtc_media_engine.h" #include "modules/audio_device/include/test_audio_device.h" #include "p2p/client/basic_port_allocator.h" @@ -112,22 +113,51 @@ private: PeerScenarioClient::CallbackHandlers* handlers_; }; + +// Used to supply a unique_ptr for an unowned TaskQueueFactory. +class TaskQueueFactoryWrapper final : public TaskQueueFactory { + public: + explicit TaskQueueFactoryWrapper(TaskQueueFactory* inner_factory) + : inner_factory_(inner_factory) {} + std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue( + absl::string_view name, + Priority priority) const override { + return inner_factory_->CreateTaskQueue(name, priority); + } + + private: + TaskQueueFactory* const inner_factory_; +}; + +class TimeControllerBasedCallFactory : public CallFactoryInterface { + public: + explicit TimeControllerBasedCallFactory(TimeController* time_controller) + : time_controller_(time_controller) {} + Call* CreateCall(const Call::Config& config) override { + return Call::Create(config, time_controller_->GetClock(), + time_controller_->CreateProcessThread("CallModules"), + time_controller_->CreateProcessThread("Pacer")); + } + + private: + TimeController* time_controller_; +}; + } // namespace PeerScenarioClient::PeerScenarioClient( NetworkEmulationManager* net, + TimeController* time_controller, rtc::Thread* signaling_thread, std::unique_ptr<LogWriterFactoryInterface> log_writer_factory, PeerScenarioClient::Config config) : endpoints_(CreateEndpoints(net, config.endpoints)), + task_queue_factory_(time_controller->GetTaskQueueFactory()), signaling_thread_(signaling_thread), log_writer_factory_(std::move(log_writer_factory)), - worker_thread_(rtc::Thread::Create()), + worker_thread_(time_controller->CreateThread("worker")), handlers_(config.handlers), observer_(new LambdaPeerConnectionObserver(&handlers_)) { - worker_thread_->SetName("worker", this); - worker_thread_->Start(); - handlers_.on_track.push_back( [this](rtc::scoped_refptr<RtpTransceiverInterface> transceiver) { auto track = transceiver->receiver()->track().get(); @@ -160,9 +190,10 @@ pcf_deps.network_thread = manager->network_thread(); pcf_deps.signaling_thread = signaling_thread_; pcf_deps.worker_thread = worker_thread_.get(); - pcf_deps.call_factory = CreateCallFactory(); - pcf_deps.task_queue_factory = CreateDefaultTaskQueueFactory(); - task_queue_factory_ = pcf_deps.task_queue_factory.get(); + pcf_deps.call_factory = + std::make_unique<TimeControllerBasedCallFactory>(time_controller); + pcf_deps.task_queue_factory = + std::make_unique<TaskQueueFactoryWrapper>(task_queue_factory_); pcf_deps.event_log_factory = std::make_unique<RtcEventLogFactory>(task_queue_factory_);
diff --git a/test/peer_scenario/peer_scenario_client.h b/test/peer_scenario/peer_scenario_client.h index 61a7741..e9b86e2 100644 --- a/test/peer_scenario/peer_scenario_client.h +++ b/test/peer_scenario/peer_scenario_client.h
@@ -20,6 +20,7 @@ #include "absl/memory/memory.h" #include "api/peer_connection_interface.h" #include "api/test/network_emulation_manager.h" +#include "api/test/time_controller.h" #include "pc/test/frame_generator_capturer_video_track_source.h" #include "test/logging/log_writer.h" @@ -107,6 +108,7 @@ PeerScenarioClient( NetworkEmulationManager* net, + TimeController* time_controller, rtc::Thread* signaling_thread, std::unique_ptr<LogWriterFactoryInterface> log_writer_factory, Config config); @@ -147,7 +149,7 @@ private: const std::map<int, EmulatedEndpoint*> endpoints_; - TaskQueueFactory* task_queue_factory_; + TaskQueueFactory* const task_queue_factory_; rtc::Thread* const signaling_thread_; const std::unique_ptr<LogWriterFactoryInterface> log_writer_factory_; const std::unique_ptr<rtc::Thread> worker_thread_;
diff --git a/test/time_controller/BUILD.gn b/test/time_controller/BUILD.gn index acb2ccb..5a159c7 100644 --- a/test/time_controller/BUILD.gn +++ b/test/time_controller/BUILD.gn
@@ -20,6 +20,8 @@ "simulated_process_thread.h", "simulated_task_queue.cc", "simulated_task_queue.h", + "simulated_thread.cc", + "simulated_thread.h", "simulated_time_controller.cc", "simulated_time_controller.h", ] @@ -33,6 +35,7 @@ "../../modules:module_api", "../../modules/utility:utility", "../../rtc_base", + "../../rtc_base:checks", "../../rtc_base:rtc_base_tests_utils", "../../rtc_base:rtc_event", "../../rtc_base/synchronization:sequence_checker",
diff --git a/test/time_controller/external_time_controller.cc b/test/time_controller/external_time_controller.cc index bb60d89..dfeae81 100644 --- a/test/time_controller/external_time_controller.cc +++ b/test/time_controller/external_time_controller.cc
@@ -21,6 +21,7 @@ #include "api/units/timestamp.h" #include "modules/include/module.h" #include "modules/utility/include/process_thread.h" +#include "rtc_base/checks.h" #include "rtc_base/synchronization/yield_policy.h" #include "test/time_controller/simulated_time_controller.h" @@ -184,6 +185,18 @@ alarm_->Sleep(duration); } +std::unique_ptr<rtc::Thread> ExternalTimeController::CreateThread( + const std::string& name, + std::unique_ptr<rtc::SocketServer> socket_server) { + RTC_NOTREACHED(); + return nullptr; +} + +rtc::Thread* ExternalTimeController::GetMainThread() { + RTC_NOTREACHED(); + return nullptr; +} + std::unique_ptr<TaskQueueBase, TaskQueueDeleter> ExternalTimeController::CreateTaskQueue( absl::string_view name,
diff --git a/test/time_controller/external_time_controller.h b/test/time_controller/external_time_controller.h index 869a78f..dfd7712 100644 --- a/test/time_controller/external_time_controller.h +++ b/test/time_controller/external_time_controller.h
@@ -38,6 +38,10 @@ std::unique_ptr<ProcessThread> CreateProcessThread( const char* thread_name) override; void AdvanceTime(TimeDelta duration) override; + std::unique_ptr<rtc::Thread> CreateThread( + const std::string& name, + std::unique_ptr<rtc::SocketServer> socket_server) override; + rtc::Thread* GetMainThread() override; // Implementation of TaskQueueFactory. std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
diff --git a/test/time_controller/real_time_controller.cc b/test/time_controller/real_time_controller.cc index 732f1bd..d9fd9dc 100644 --- a/test/time_controller/real_time_controller.cc +++ b/test/time_controller/real_time_controller.cc
@@ -10,10 +10,10 @@ #include "test/time_controller/real_time_controller.h" #include "api/task_queue/default_task_queue_factory.h" +#include "rtc_base/null_socket_server.h" #include "system_wrappers/include/sleep.h" namespace webrtc { - RealTimeController::RealTimeController() : task_queue_factory_(CreateDefaultTaskQueueFactory()) {} @@ -30,8 +30,23 @@ return ProcessThread::Create(thread_name); } +std::unique_ptr<rtc::Thread> RealTimeController::CreateThread( + const std::string& name, + std::unique_ptr<rtc::SocketServer> socket_server) { + if (!socket_server) + socket_server = std::make_unique<rtc::NullSocketServer>(); + auto res = std::make_unique<rtc::Thread>(std::move(socket_server)); + res->SetName(name, nullptr); + res->Start(); + return res; +} + +rtc::Thread* RealTimeController::GetMainThread() { + return rtc::Thread::Current(); +} + void RealTimeController::AdvanceTime(TimeDelta duration) { - SleepMs(duration.ms()); + GetMainThread()->ProcessMessages(duration.ms()); } RealTimeController* GlobalRealTimeController() {
diff --git a/test/time_controller/real_time_controller.h b/test/time_controller/real_time_controller.h index 873ef90..f68fe44 100644 --- a/test/time_controller/real_time_controller.h +++ b/test/time_controller/real_time_controller.h
@@ -28,10 +28,14 @@ TaskQueueFactory* GetTaskQueueFactory() override; std::unique_ptr<ProcessThread> CreateProcessThread( const char* thread_name) override; + std::unique_ptr<rtc::Thread> CreateThread( + const std::string& name, + std::unique_ptr<rtc::SocketServer> socket_server) override; + rtc::Thread* GetMainThread() override; void AdvanceTime(TimeDelta duration) override; private: - std::unique_ptr<TaskQueueFactory> task_queue_factory_; + const std::unique_ptr<TaskQueueFactory> task_queue_factory_; }; RealTimeController* GlobalRealTimeController();
diff --git a/test/time_controller/simulated_thread.cc b/test/time_controller/simulated_thread.cc new file mode 100644 index 0000000..8d82ebd --- /dev/null +++ b/test/time_controller/simulated_thread.cc
@@ -0,0 +1,135 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include "test/time_controller/simulated_thread.h" + +#include <algorithm> +#include <utility> + +#include "rtc_base/task_utils/to_queued_task.h" + +namespace webrtc { +namespace { + +// A socket server that does nothing. It's different from NullSocketServer in +// that it does allow sleep/wakeup. This avoids usage of an Event instance which +// otherwise would cause issues with the simulated Yeild behavior. +class DummySocketServer : public rtc::SocketServer { + public: + rtc::Socket* CreateSocket(int family, int type) override { + RTC_NOTREACHED(); + return nullptr; + } + rtc::AsyncSocket* CreateAsyncSocket(int family, int type) override { + RTC_NOTREACHED(); + return nullptr; + } + bool Wait(int cms, bool process_io) override { + RTC_CHECK_EQ(cms, 0); + return true; + } + void WakeUp() override {} +}; + +} // namespace + +SimulatedThread::SimulatedThread( + sim_time_impl::SimulatedTimeControllerImpl* handler, + absl::string_view name, + std::unique_ptr<rtc::SocketServer> socket_server) + : rtc::Thread(socket_server ? std::move(socket_server) + : std::make_unique<DummySocketServer>()), + handler_(handler), + name_(new char[name.size()]) { + std::copy_n(name.begin(), name.size(), name_); +} + +SimulatedThread::~SimulatedThread() { + handler_->Unregister(this); + delete[] name_; +} + +void SimulatedThread::RunReady(Timestamp at_time) { + CurrentThreadSetter set_current(this); + ProcessMessages(0); + int delay_ms = GetDelay(); + rtc::CritScope lock(&lock_); + if (delay_ms == kForever) { + next_run_time_ = Timestamp::PlusInfinity(); + } else { + next_run_time_ = at_time + TimeDelta::ms(delay_ms); + } +} + +void SimulatedThread::Send(const rtc::Location& posted_from, + rtc::MessageHandler* phandler, + uint32_t id, + rtc::MessageData* pdata) { + if (IsQuitting()) + return; + rtc::Message msg; + msg.posted_from = posted_from; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + if (IsCurrent()) { + msg.phandler->OnMessage(&msg); + } else { + CurrentThreadSetter set_current(this); + msg.phandler->OnMessage(&msg); + } +} + +void SimulatedThread::Post(const rtc::Location& posted_from, + rtc::MessageHandler* phandler, + uint32_t id, + rtc::MessageData* pdata, + bool time_sensitive) { + rtc::Thread::Post(posted_from, phandler, id, pdata, time_sensitive); + rtc::CritScope lock(&lock_); + next_run_time_ = Timestamp::MinusInfinity(); +} + +void SimulatedThread::PostDelayed(const rtc::Location& posted_from, + int delay_ms, + rtc::MessageHandler* phandler, + uint32_t id, + rtc::MessageData* pdata) { + rtc::Thread::PostDelayed(posted_from, delay_ms, phandler, id, pdata); + rtc::CritScope lock(&lock_); + next_run_time_ = + std::min(next_run_time_, Timestamp::ms(rtc::TimeMillis() + delay_ms)); +} + +void SimulatedThread::PostAt(const rtc::Location& posted_from, + int64_t target_time_ms, + rtc::MessageHandler* phandler, + uint32_t id, + rtc::MessageData* pdata) { + rtc::Thread::PostAt(posted_from, target_time_ms, phandler, id, pdata); + rtc::CritScope lock(&lock_); + next_run_time_ = std::min(next_run_time_, Timestamp::ms(target_time_ms)); +} + +void SimulatedThread::Stop() { + Thread::Quit(); +} + +SimulatedMainThread::SimulatedMainThread( + sim_time_impl::SimulatedTimeControllerImpl* handler) + : SimulatedThread(handler, "main", nullptr), current_setter_(this) {} + +SimulatedMainThread::~SimulatedMainThread() { + // Removes pending tasks in case they keep shared pointer references to + // objects whose destructor expects to run before the Thread destructor. + Stop(); + DoDestroy(); +} + +} // namespace webrtc
diff --git a/test/time_controller/simulated_thread.h b/test/time_controller/simulated_thread.h new file mode 100644 index 0000000..bbaafd7 --- /dev/null +++ b/test/time_controller/simulated_thread.h
@@ -0,0 +1,91 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef TEST_TIME_CONTROLLER_SIMULATED_THREAD_H_ +#define TEST_TIME_CONTROLLER_SIMULATED_THREAD_H_ + +#include <memory> + +#include "test/time_controller/simulated_time_controller.h" + +namespace webrtc { + +class SimulatedThread : public rtc::Thread, + public sim_time_impl::SimulatedSequenceRunner { + public: + class CurrentThreadSetter : CurrentTaskQueueSetter { + public: + explicit CurrentThreadSetter(Thread* thread) + : CurrentTaskQueueSetter(thread), + manager_(rtc::ThreadManager::Instance()), + previous_(manager_->CurrentThread()) { + manager_->ChangeCurrentThreadForTest(thread); + } + ~CurrentThreadSetter() { manager_->ChangeCurrentThreadForTest(previous_); } + + private: + rtc::ThreadManager* const manager_; + rtc::Thread* const previous_; + }; + SimulatedThread(sim_time_impl::SimulatedTimeControllerImpl* handler, + absl::string_view name, + std::unique_ptr<rtc::SocketServer> socket_server); + ~SimulatedThread() override; + + void RunReady(Timestamp at_time) override; + + Timestamp GetNextRunTime() const override { + rtc::CritScope lock(&lock_); + return next_run_time_; + } + + TaskQueueBase* GetAsTaskQueue() override { return this; } + + // Thread interface + void Send(const rtc::Location& posted_from, + rtc::MessageHandler* phandler, + uint32_t id, + rtc::MessageData* pdata) override; + void Post(const rtc::Location& posted_from, + rtc::MessageHandler* phandler, + uint32_t id, + rtc::MessageData* pdata, + bool time_sensitive) override; + void PostDelayed(const rtc::Location& posted_from, + int delay_ms, + rtc::MessageHandler* phandler, + uint32_t id, + rtc::MessageData* pdata) override; + void PostAt(const rtc::Location& posted_from, + int64_t target_time_ms, + rtc::MessageHandler* phandler, + uint32_t id, + rtc::MessageData* pdata) override; + + void Stop() override; + + private: + sim_time_impl::SimulatedTimeControllerImpl* const handler_; + // Using char* to be debugger friendly. + char* name_; + rtc::CriticalSection lock_; + Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity(); +}; + +class SimulatedMainThread : public SimulatedThread { + public: + explicit SimulatedMainThread( + sim_time_impl::SimulatedTimeControllerImpl* handler); + ~SimulatedMainThread(); + + private: + CurrentThreadSetter current_setter_; +}; +} // namespace webrtc +#endif // TEST_TIME_CONTROLLER_SIMULATED_THREAD_H_
diff --git a/test/time_controller/simulated_time_controller.cc b/test/time_controller/simulated_time_controller.cc index 3bdce8d..d3bc66a 100644 --- a/test/time_controller/simulated_time_controller.cc +++ b/test/time_controller/simulated_time_controller.cc
@@ -20,6 +20,7 @@ #include "absl/strings/string_view.h" #include "test/time_controller/simulated_process_thread.h" #include "test/time_controller/simulated_task_queue.h" +#include "test/time_controller/simulated_thread.h" namespace webrtc { namespace { @@ -49,8 +50,8 @@ auto mutable_this = const_cast<SimulatedTimeControllerImpl*>(this); auto task_queue = std::unique_ptr<SimulatedTaskQueue, TaskQueueDeleter>( new SimulatedTaskQueue(mutable_this, name)); - rtc::CritScope lock(&mutable_this->lock_); - mutable_this->runners_.push_back(task_queue.get()); + ; + mutable_this->Register(task_queue.get()); return task_queue; } @@ -59,10 +60,19 @@ rtc::CritScope lock(&lock_); auto process_thread = std::make_unique<SimulatedProcessThread>(this, thread_name); - runners_.push_back(process_thread.get()); + Register(process_thread.get()); return process_thread; } +std::unique_ptr<rtc::Thread> SimulatedTimeControllerImpl::CreateThread( + const std::string& name, + std::unique_ptr<rtc::SocketServer> socket_server) { + auto thread = + std::make_unique<SimulatedThread>(this, name, std::move(socket_server)); + Register(thread.get()); + return thread; +} + void SimulatedTimeControllerImpl::YieldExecution() { if (rtc::CurrentThreadId() == thread_id_) { TaskQueueBase* yielding_from = TaskQueueBase::Current(); @@ -83,6 +93,9 @@ } void SimulatedTimeControllerImpl::RunReadyRunners() { + // Using a dummy thread rather than nullptr to avoid implicit thread creation + // by Thread::Current(). + SimulatedThread::CurrentThreadSetter set_current(dummy_thread_.get()); rtc::CritScope lock(&lock_); RTC_DCHECK_EQ(rtc::CurrentThreadId(), thread_id_); Timestamp current_time = CurrentTime(); @@ -136,6 +149,11 @@ current_time_ = target_time; } +void SimulatedTimeControllerImpl::Register(SimulatedSequenceRunner* runner) { + rtc::CritScope lock(&lock_); + runners_.push_back(runner); +} + void SimulatedTimeControllerImpl::Unregister(SimulatedSequenceRunner* runner) { rtc::CritScope lock(&lock_); bool removed = RemoveByValue(&runners_, runner); @@ -148,6 +166,9 @@ Timestamp start_time) : sim_clock_(start_time.us()), impl_(start_time), yield_policy_(&impl_) { global_clock_.SetTime(start_time); + auto main_thread = std::make_unique<SimulatedMainThread>(&impl_); + impl_.Register(main_thread.get()); + main_thread_ = std::move(main_thread); } GlobalSimulatedTimeController::~GlobalSimulatedTimeController() = default; @@ -165,6 +186,16 @@ return impl_.CreateProcessThread(thread_name); } +std::unique_ptr<rtc::Thread> GlobalSimulatedTimeController::CreateThread( + const std::string& name, + std::unique_ptr<rtc::SocketServer> socket_server) { + return impl_.CreateThread(name, std::move(socket_server)); +} + +rtc::Thread* GlobalSimulatedTimeController::GetMainThread() { + return main_thread_.get(); +} + void GlobalSimulatedTimeController::AdvanceTime(TimeDelta duration) { rtc::ScopedYieldPolicy yield_policy(&impl_); Timestamp current_time = impl_.CurrentTime();
diff --git a/test/time_controller/simulated_time_controller.h b/test/time_controller/simulated_time_controller.h index e9f91b6..783edb2 100644 --- a/test/time_controller/simulated_time_controller.h +++ b/test/time_controller/simulated_time_controller.h
@@ -60,6 +60,10 @@ void YieldExecution() override; // Create process thread with the name |thread_name|. std::unique_ptr<ProcessThread> CreateProcessThread(const char* thread_name); + // Create thread using provided |socket_server|. + std::unique_ptr<rtc::Thread> CreateThread( + const std::string& name, + std::unique_ptr<rtc::SocketServer> socket_server); // Runs all runners in |runners_| that has tasks or modules ready for // execution. @@ -70,11 +74,14 @@ Timestamp NextRunTime() const; // Set |current_time_| to |target_time|. void AdvanceTime(Timestamp target_time); + // Adds |runner| to |runners_|. + void Register(SimulatedSequenceRunner* runner); // Removes |runner| from |runners_|. void Unregister(SimulatedSequenceRunner* runner); private: const rtc::PlatformThreadId thread_id_; + const std::unique_ptr<rtc::Thread> dummy_thread_ = rtc::Thread::Create(); rtc::CriticalSection time_lock_; Timestamp current_time_ RTC_GUARDED_BY(time_lock_); rtc::CriticalSection lock_; @@ -119,6 +126,10 @@ TaskQueueFactory* GetTaskQueueFactory() override; std::unique_ptr<ProcessThread> CreateProcessThread( const char* thread_name) override; + std::unique_ptr<rtc::Thread> CreateThread( + const std::string& name, + std::unique_ptr<rtc::SocketServer> socket_server) override; + rtc::Thread* GetMainThread() override; void AdvanceTime(TimeDelta duration) override; @@ -128,6 +139,7 @@ SimulatedClock sim_clock_; sim_time_impl::SimulatedTimeControllerImpl impl_; rtc::ScopedYieldPolicy yield_policy_; + std::unique_ptr<rtc::Thread> main_thread_; }; } // namespace webrtc