Delete ProcessThread and related Module interface
Bug: webrtc:7219
Change-Id: Id71430a24b21e591494557cf54419d2bc8b3f8c6
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/267400
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Auto-Submit: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37416}
diff --git a/audio/BUILD.gn b/audio/BUILD.gn
index 3be1cad..3efd00a 100644
--- a/audio/BUILD.gn
+++ b/audio/BUILD.gn
@@ -186,7 +186,6 @@
"../modules/pacing",
"../modules/rtp_rtcp:mock_rtp_rtcp",
"../modules/rtp_rtcp:rtp_rtcp_format",
- "../modules/utility",
"../rtc_base:checks",
"../rtc_base:macromagic",
"../rtc_base:refcount",
diff --git a/audio/voip/BUILD.gn b/audio/voip/BUILD.gn
index b939733..a70d1c3 100644
--- a/audio/voip/BUILD.gn
+++ b/audio/voip/BUILD.gn
@@ -23,7 +23,6 @@
"../../modules/audio_device:audio_device_api",
"../../modules/audio_mixer:audio_mixer_impl",
"../../modules/audio_processing:api",
- "../../modules/utility:utility",
"../../rtc_base:criticalsection",
"../../rtc_base:logging",
"../../rtc_base/synchronization:mutex",
@@ -46,7 +45,6 @@
"../../modules/audio_device:audio_device_api",
"../../modules/rtp_rtcp",
"../../modules/rtp_rtcp:rtp_rtcp_format",
- "../../modules/utility",
"../../rtc_base:criticalsection",
"../../rtc_base:location",
"../../rtc_base:logging",
@@ -71,7 +69,6 @@
"../../modules/audio_coding",
"../../modules/rtp_rtcp",
"../../modules/rtp_rtcp:rtp_rtcp_format",
- "../../modules/utility",
"../../rtc_base:criticalsection",
"../../rtc_base:logging",
"../../rtc_base:safe_minmax",
diff --git a/audio/voip/test/BUILD.gn b/audio/voip/test/BUILD.gn
index a2cc1c5..d2ae985 100644
--- a/audio/voip/test/BUILD.gn
+++ b/audio/voip/test/BUILD.gn
@@ -30,7 +30,6 @@
"../../../api/task_queue:default_task_queue_factory",
"../../../modules/audio_device:mock_audio_device",
"../../../modules/audio_processing:mocks",
- "../../../modules/utility:mock_process_thread",
"../../../test:audio_codec_mocks",
"../../../test:mock_transport",
"../../../test:run_loop",
@@ -53,7 +52,6 @@
"../../../modules/audio_mixer:audio_mixer_test_utils",
"../../../modules/rtp_rtcp:rtp_rtcp",
"../../../modules/rtp_rtcp:rtp_rtcp_format",
- "../../../modules/utility",
"../../../rtc_base:logging",
"../../../test:mock_transport",
"../../../test:test_support",
diff --git a/examples/androidvoip/BUILD.gn b/examples/androidvoip/BUILD.gn
index 3120e06..f7f0d90 100644
--- a/examples/androidvoip/BUILD.gn
+++ b/examples/androidvoip/BUILD.gn
@@ -67,7 +67,6 @@
"//api/task_queue:default_task_queue_factory",
"//api/voip:voip_api",
"//api/voip:voip_engine_factory",
- "//modules/utility:utility",
"//rtc_base",
"//rtc_base/third_party/sigslot:sigslot",
"//sdk/android:native_api_audio_device_module",
diff --git a/modules/BUILD.gn b/modules/BUILD.gn
index fd408ab..3ad13b2 100644
--- a/modules/BUILD.gn
+++ b/modules/BUILD.gn
@@ -36,10 +36,7 @@
rtc_source_set("module_api") {
visibility = [ "*" ]
- sources = [
- "include/module.h",
- "include/module_common_types.h",
- ]
+ sources = [ "include/module_common_types.h" ]
}
rtc_source_set("module_fec_api") {
@@ -221,7 +218,6 @@
"pacing:pacing_unittests",
"remote_bitrate_estimator:remote_bitrate_estimator_unittests",
"rtp_rtcp:rtp_rtcp_unittests",
- "utility:utility_unittests",
"video_coding:video_coding_unittests",
"video_coding/timing:timing_unittests",
"video_processing:video_processing_unittests",
diff --git a/modules/audio_device/BUILD.gn b/modules/audio_device/BUILD.gn
index b376955b..e565b5a 100644
--- a/modules/audio_device/BUILD.gn
+++ b/modules/audio_device/BUILD.gn
@@ -433,7 +433,6 @@
"../../system_wrappers",
"../../test:fileutils",
"../../test:test_support",
- "../utility",
]
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
if (is_linux || is_chromeos || is_mac || is_win) {
@@ -460,6 +459,7 @@
"../../sdk/android:libjingle_peerconnection_java",
"../../sdk/android:native_api_jni",
"../../sdk/android:native_test_jni_onload",
+ "../utility",
]
}
if (!rtc_include_internal_audio_device) {
diff --git a/modules/congestion_controller/goog_cc/BUILD.gn b/modules/congestion_controller/goog_cc/BUILD.gn
index 00cd2d5..e417405 100644
--- a/modules/congestion_controller/goog_cc/BUILD.gn
+++ b/modules/congestion_controller/goog_cc/BUILD.gn
@@ -30,7 +30,6 @@
":probe_controller",
":pushback_controller",
":send_side_bwe",
- "../..:module_api",
"../../../api:field_trials_view",
"../../../api:network_state_predictor_api",
"../../../api/rtc_event_log",
diff --git a/modules/include/module.h b/modules/include/module.h
deleted file mode 100644
index 3046390..0000000
--- a/modules/include/module.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef MODULES_INCLUDE_MODULE_H_
-#define MODULES_INCLUDE_MODULE_H_
-
-#include <stdint.h>
-
-namespace webrtc {
-
-class ProcessThread;
-
-class Module {
- public:
- // Returns the number of milliseconds until the module wants a worker
- // thread to call Process.
- // This method is called on the same worker thread as Process will
- // be called on.
- // TODO(tommi): Almost all implementations of this function, need to know
- // the current tick count. Consider passing it as an argument. It could
- // also improve the accuracy of when the next callback occurs since the
- // thread that calls Process() will also have it's tick count reference
- // which might not match with what the implementations use.
- virtual int64_t TimeUntilNextProcess() = 0;
-
- // Process any pending tasks such as timeouts.
- // Called on a worker thread.
- virtual void Process() = 0;
-
- // This method is called when the module is attached to a *running* process
- // thread or detached from one. In the case of detaching, `process_thread`
- // will be nullptr.
- //
- // This method will be called in the following cases:
- //
- // * Non-null process_thread:
- // * ProcessThread::RegisterModule() is called while the thread is running.
- // * ProcessThread::Start() is called and RegisterModule has previously
- // been called. The thread will be started immediately after notifying
- // all modules.
- //
- // * Null process_thread:
- // * ProcessThread::DeRegisterModule() is called while the thread is
- // running.
- // * ProcessThread::Stop() was called and the thread has been stopped.
- //
- // NOTE: This method is not called from the worker thread itself, but from
- // the thread that registers/deregisters the module or calls Start/Stop.
- virtual void ProcessThreadAttached(ProcessThread* process_thread) {}
-
- protected:
- virtual ~Module() {}
-};
-} // namespace webrtc
-
-#endif // MODULES_INCLUDE_MODULE_H_
diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn
index 3095df9..b424e2c 100644
--- a/modules/pacing/BUILD.gn
+++ b/modules/pacing/BUILD.gn
@@ -32,7 +32,6 @@
deps = [
":interval_budget",
- "..:module_api",
"../../api:field_trials_view",
"../../api:field_trials_view",
"../../api:function_view",
@@ -63,7 +62,6 @@
"../../system_wrappers:metrics",
"../rtp_rtcp",
"../rtp_rtcp:rtp_rtcp_format",
- "../utility",
]
absl_deps = [
"//third_party/abseil-cpp/absl/memory",
@@ -106,7 +104,6 @@
"../../api/units:data_rate",
"../../api/units:time_delta",
"../../api/units:timestamp",
- "../../modules/utility:mock_process_thread",
"../../rtc_base:checks",
"../../rtc_base:rtc_base_tests_utils",
"../../rtc_base/experiments:alr_experiment",
diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc
index 44316ef..926f4d9 100644
--- a/modules/pacing/task_queue_paced_sender_unittest.cc
+++ b/modules/pacing/task_queue_paced_sender_unittest.cc
@@ -23,7 +23,6 @@
#include "api/transport/network_types.h"
#include "api/units/data_rate.h"
#include "modules/pacing/packet_router.h"
-#include "modules/utility/include/mock/mock_process_thread.h"
#include "test/gmock.h"
#include "test/gtest.h"
#include "test/scoped_key_value_config.h"
diff --git a/modules/utility/BUILD.gn b/modules/utility/BUILD.gn
index fc8512e..2b560943 100644
--- a/modules/utility/BUILD.gn
+++ b/modules/utility/BUILD.gn
@@ -8,68 +8,29 @@
import("../../webrtc.gni")
-rtc_library("utility") {
- visibility = [ "*" ]
- sources = [
- "include/process_thread.h",
- "source/process_thread_impl.cc",
- "source/process_thread_impl.h",
- ]
+if (is_android) {
+ rtc_library("utility") {
+ visibility = [ "*" ]
- if (is_android) {
- sources += [
+ sources = [
"include/helpers_android.h",
"include/jvm_android.h",
"source/helpers_android.cc",
"source/jvm_android.cc",
]
- }
- if (is_ios) {
- frameworks = [ "AVFoundation.framework" ]
- }
-
- deps = [
- "..:module_api",
- "../../api:sequence_checker",
- "../../api/task_queue",
- "../../common_audio",
- "../../rtc_base:checks",
- "../../rtc_base:event_tracer",
- "../../rtc_base:location",
- "../../rtc_base:logging",
- "../../rtc_base:platform_thread",
- "../../rtc_base:rtc_event",
- "../../rtc_base:timeutils",
- "../../rtc_base/system:arch",
- "../../system_wrappers",
- ]
-}
-
-rtc_library("mock_process_thread") {
- testonly = true
- visibility = [ "*" ]
- sources = [ "include/mock/mock_process_thread.h" ]
- deps = [
- ":utility",
- "../../rtc_base:location",
- "../../test:test_support",
- ]
-}
-
-if (rtc_include_tests) {
- rtc_library("utility_unittests") {
- testonly = true
-
- sources = [ "source/process_thread_impl_unittest.cc" ]
deps = [
- ":utility",
- "..:module_api",
- "../../api/task_queue",
- "../../api/task_queue:task_queue_test",
- "../../rtc_base:location",
- "../../rtc_base:timeutils",
- "../../test:test_support",
+ "../../api:sequence_checker",
+ "../../rtc_base:checks",
+ "../../rtc_base:logging",
+ "../../rtc_base:platform_thread",
+ "../../rtc_base/system:arch",
]
}
+} else {
+ # Add an empty source set so that dependent targets may include utility
+ # unconditionally.
+ rtc_source_set("utility") {
+ visibility = [ "*" ]
+ }
}
diff --git a/modules/utility/include/mock/mock_process_thread.h b/modules/utility/include/mock/mock_process_thread.h
deleted file mode 100644
index e356bca..0000000
--- a/modules/utility/include/mock/mock_process_thread.h
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (c) 2014 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef MODULES_UTILITY_INCLUDE_MOCK_MOCK_PROCESS_THREAD_H_
-#define MODULES_UTILITY_INCLUDE_MOCK_MOCK_PROCESS_THREAD_H_
-
-#include <memory>
-
-#include "modules/utility/include/process_thread.h"
-#include "rtc_base/location.h"
-#include "test/gmock.h"
-
-namespace webrtc {
-
-class MockProcessThread : public ProcessThread {
- public:
- MOCK_METHOD(void, Start, (), (override));
- MOCK_METHOD(void, Stop, (), (override));
- MOCK_METHOD(void, Delete, (), (override));
- MOCK_METHOD(void, WakeUp, (Module*), (override));
- MOCK_METHOD(void, PostTask, (std::unique_ptr<QueuedTask>), (override));
- MOCK_METHOD(void,
- PostDelayedTask,
- (std::unique_ptr<QueuedTask>, uint32_t),
- (override));
- MOCK_METHOD(void,
- RegisterModule,
- (Module*, const rtc::Location&),
- (override));
- MOCK_METHOD(void, DeRegisterModule, (Module*), (override));
-};
-
-} // namespace webrtc
-#endif // MODULES_UTILITY_INCLUDE_MOCK_MOCK_PROCESS_THREAD_H_
diff --git a/modules/utility/include/process_thread.h b/modules/utility/include/process_thread.h
deleted file mode 100644
index 7786dac..0000000
--- a/modules/utility/include/process_thread.h
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef MODULES_UTILITY_INCLUDE_PROCESS_THREAD_H_
-#define MODULES_UTILITY_INCLUDE_PROCESS_THREAD_H_
-
-#include <memory>
-
-#include "api/task_queue/queued_task.h"
-#include "api/task_queue/task_queue_base.h"
-
-namespace rtc {
-class Location;
-}
-
-namespace webrtc {
-class Module;
-
-// TODO(tommi): ProcessThread probably doesn't need to be a virtual
-// interface. There exists one override besides ProcessThreadImpl,
-// MockProcessThread, but when looking at how it is used, it seems
-// a nullptr might suffice (or simply an actual ProcessThread instance).
-class ProcessThread : public TaskQueueBase {
- public:
- ~ProcessThread() override;
-
- static std::unique_ptr<ProcessThread> Create(const char* thread_name);
-
- // Starts the worker thread. Must be called from the construction thread.
- virtual void Start() = 0;
-
- // Stops the worker thread. Must be called from the construction thread.
- virtual void Stop() = 0;
-
- // Wakes the thread up to give a module a chance to do processing right
- // away. This causes the worker thread to wake up and requery the specified
- // module for when it should be called back. (Typically the module should
- // return 0 from TimeUntilNextProcess on the worker thread at that point).
- // Can be called on any thread.
- virtual void WakeUp(Module* module) = 0;
-
- // Adds a module that will start to receive callbacks on the worker thread.
- // Can be called from any thread.
- virtual void RegisterModule(Module* module, const rtc::Location& from) = 0;
-
- // Removes a previously registered module.
- // Can be called from any thread.
- virtual void DeRegisterModule(Module* module) = 0;
-};
-
-} // namespace webrtc
-
-#endif // MODULES_UTILITY_INCLUDE_PROCESS_THREAD_H_
diff --git a/modules/utility/source/process_thread_impl.cc b/modules/utility/source/process_thread_impl.cc
deleted file mode 100644
index 2274aae..0000000
--- a/modules/utility/source/process_thread_impl.cc
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "modules/utility/source/process_thread_impl.h"
-
-#include <string>
-
-#include "modules/include/module.h"
-#include "rtc_base/checks.h"
-#include "rtc_base/logging.h"
-#include "rtc_base/time_utils.h"
-#include "rtc_base/trace_event.h"
-
-namespace webrtc {
-namespace {
-
-// We use this constant internally to signal that a module has requested
-// a callback right away. When this is set, no call to TimeUntilNextProcess
-// should be made, but Process() should be called directly.
-const int64_t kCallProcessImmediately = -1;
-
-int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
- int64_t interval = module->TimeUntilNextProcess();
- if (interval < 0) {
- // Falling behind, we should call the callback now.
- return time_now;
- }
- return time_now + interval;
-}
-} // namespace
-
-ProcessThread::~ProcessThread() {}
-
-// static
-std::unique_ptr<ProcessThread> ProcessThread::Create(const char* thread_name) {
- return std::unique_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
-}
-
-ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
- : stop_(false), thread_name_(thread_name) {}
-
-ProcessThreadImpl::~ProcessThreadImpl() {
- RTC_DCHECK(thread_checker_.IsCurrent());
- RTC_DCHECK(!stop_);
-
- while (!delayed_tasks_.empty()) {
- delete delayed_tasks_.top().task;
- delayed_tasks_.pop();
- }
-
- while (!queue_.empty()) {
- delete queue_.front();
- queue_.pop();
- }
-}
-
-void ProcessThreadImpl::Delete() {
- RTC_LOG(LS_WARNING) << "Process thread " << thread_name_
- << " is destroyed as a TaskQueue.";
- Stop();
- delete this;
-}
-
-// Doesn't need locking, because the contending thread isn't running.
-void ProcessThreadImpl::Start() RTC_NO_THREAD_SAFETY_ANALYSIS {
- RTC_DCHECK(thread_checker_.IsCurrent());
- RTC_DCHECK(thread_.empty());
- if (!thread_.empty())
- return;
-
- RTC_DCHECK(!stop_);
-
- for (ModuleCallback& m : modules_)
- m.module->ProcessThreadAttached(this);
-
- thread_ = rtc::PlatformThread::SpawnJoinable(
- [this] {
- CurrentTaskQueueSetter set_current(this);
- while (Process()) {
- }
- },
- thread_name_);
-}
-
-void ProcessThreadImpl::Stop() {
- RTC_DCHECK(thread_checker_.IsCurrent());
- if (thread_.empty())
- return;
-
- {
- // Need to take lock, for synchronization with `thread_`.
- MutexLock lock(&mutex_);
- stop_ = true;
- }
-
- wake_up_.Set();
- thread_.Finalize();
-
- StopNoLocks();
-}
-
-// No locking needed, since this is called after the contending thread is
-// stopped.
-void ProcessThreadImpl::StopNoLocks() RTC_NO_THREAD_SAFETY_ANALYSIS {
- RTC_DCHECK(thread_.empty());
- stop_ = false;
-
- for (ModuleCallback& m : modules_)
- m.module->ProcessThreadAttached(nullptr);
-}
-
-void ProcessThreadImpl::WakeUp(Module* module) {
- // Allowed to be called on any thread.
- auto holds_mutex = [this] {
- if (!IsCurrent()) {
- return false;
- }
- RTC_DCHECK_RUN_ON(this);
- return holds_mutex_;
- };
- if (holds_mutex()) {
- // Avoid locking if called on the ProcessThread, via a module's Process),
- WakeUpNoLocks(module);
- } else {
- MutexLock lock(&mutex_);
- WakeUpInternal(module);
- }
- wake_up_.Set();
-}
-
-// Must be called only indirectly from Process, which already holds the lock.
-void ProcessThreadImpl::WakeUpNoLocks(Module* module)
- RTC_NO_THREAD_SAFETY_ANALYSIS {
- RTC_DCHECK_RUN_ON(this);
- WakeUpInternal(module);
-}
-
-void ProcessThreadImpl::WakeUpInternal(Module* module) {
- for (ModuleCallback& m : modules_) {
- if (m.module == module)
- m.next_callback = kCallProcessImmediately;
- }
-}
-
-void ProcessThreadImpl::PostTask(std::unique_ptr<QueuedTask> task) {
- // Allowed to be called on any thread, except from a module's Process method.
- if (IsCurrent()) {
- RTC_DCHECK_RUN_ON(this);
- RTC_DCHECK(!holds_mutex_) << "Calling ProcessThread::PostTask from "
- "Module::Process is not supported";
- }
- {
- MutexLock lock(&mutex_);
- queue_.push(task.release());
- }
- wake_up_.Set();
-}
-
-void ProcessThreadImpl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) {
- int64_t run_at_ms = rtc::TimeMillis() + milliseconds;
- bool recalculate_wakeup_time;
- {
- MutexLock lock(&mutex_);
- recalculate_wakeup_time =
- delayed_tasks_.empty() || run_at_ms < delayed_tasks_.top().run_at_ms;
- delayed_tasks_.emplace(run_at_ms, sequence_id_++, std::move(task));
- }
- if (recalculate_wakeup_time) {
- wake_up_.Set();
- }
-}
-
-void ProcessThreadImpl::RegisterModule(Module* module,
- const rtc::Location& from) {
- TRACE_EVENT0("webrtc", "ProcessThreadImpl::RegisterModule");
- RTC_DCHECK(thread_checker_.IsCurrent());
- RTC_DCHECK(module) << from.ToString();
-
-#if RTC_DCHECK_IS_ON
- {
- // Catch programmer error.
- MutexLock lock(&mutex_);
- for (const ModuleCallback& mc : modules_) {
- RTC_DCHECK(mc.module != module)
- << "Already registered here: " << mc.location.ToString()
- << "\n"
- "Now attempting from here: "
- << from.ToString();
- }
- }
-#endif
-
- // Now that we know the module isn't in the list, we'll call out to notify
- // the module that it's attached to the worker thread. We don't hold
- // the lock while we make this call.
- if (!thread_.empty())
- module->ProcessThreadAttached(this);
-
- {
- MutexLock lock(&mutex_);
- modules_.push_back(ModuleCallback(module, from));
- }
-
- // Wake the thread calling ProcessThreadImpl::Process() to update the
- // waiting time. The waiting time for the just registered module may be
- // shorter than all other registered modules.
- wake_up_.Set();
-}
-
-void ProcessThreadImpl::DeRegisterModule(Module* module) {
- RTC_DCHECK(thread_checker_.IsCurrent());
- RTC_DCHECK(module);
-
- {
- MutexLock lock(&mutex_);
- modules_.remove_if(
- [&module](const ModuleCallback& m) { return m.module == module; });
- }
-
- // Notify the module that it's been detached.
- module->ProcessThreadAttached(nullptr);
-}
-
-bool ProcessThreadImpl::Process() {
- TRACE_EVENT1("webrtc", "ProcessThreadImpl", "name", thread_name_);
- int64_t now = rtc::TimeMillis();
- int64_t next_checkpoint = now + (1000 * 60);
- RTC_DCHECK_RUN_ON(this);
- {
- MutexLock lock(&mutex_);
- if (stop_)
- return false;
- for (ModuleCallback& m : modules_) {
- // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
- // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
- // operation should not require taking a lock, so querying all modules
- // should run in a matter of nanoseconds.
- if (m.next_callback == 0)
- m.next_callback = GetNextCallbackTime(m.module, now);
-
- // Set to true for the duration of the calls to modules' Process().
- holds_mutex_ = true;
- if (m.next_callback <= now ||
- m.next_callback == kCallProcessImmediately) {
- {
- TRACE_EVENT2("webrtc", "ModuleProcess", "function",
- m.location.function_name(), "file",
- m.location.file_name());
- m.module->Process();
- }
- // Use a new 'now' reference to calculate when the next callback
- // should occur. We'll continue to use 'now' above for the baseline
- // of calculating how long we should wait, to reduce variance.
- int64_t new_now = rtc::TimeMillis();
- m.next_callback = GetNextCallbackTime(m.module, new_now);
- }
- holds_mutex_ = false;
-
- if (m.next_callback < next_checkpoint)
- next_checkpoint = m.next_callback;
- }
-
- while (!delayed_tasks_.empty() && delayed_tasks_.top().run_at_ms <= now) {
- queue_.push(delayed_tasks_.top().task);
- delayed_tasks_.pop();
- }
-
- if (!delayed_tasks_.empty()) {
- next_checkpoint =
- std::min(next_checkpoint, delayed_tasks_.top().run_at_ms);
- }
-
- while (!queue_.empty()) {
- QueuedTask* task = queue_.front();
- queue_.pop();
- mutex_.Unlock();
- if (task->Run()) {
- delete task;
- }
- mutex_.Lock();
- }
- }
-
- int64_t time_to_wait = next_checkpoint - rtc::TimeMillis();
- if (time_to_wait > 0)
- wake_up_.Wait(static_cast<int>(time_to_wait));
-
- return true;
-}
-} // namespace webrtc
diff --git a/modules/utility/source/process_thread_impl.h b/modules/utility/source/process_thread_impl.h
deleted file mode 100644
index 0dc7aff..0000000
--- a/modules/utility/source/process_thread_impl.h
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (c) 2011 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#ifndef MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_
-#define MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_
-
-#include <stdint.h>
-
-#include <list>
-#include <memory>
-#include <queue>
-
-#include "api/sequence_checker.h"
-#include "api/task_queue/queued_task.h"
-#include "modules/include/module.h"
-#include "modules/utility/include/process_thread.h"
-#include "rtc_base/event.h"
-#include "rtc_base/location.h"
-#include "rtc_base/platform_thread.h"
-
-namespace webrtc {
-
-class ProcessThreadImpl : public ProcessThread {
- public:
- explicit ProcessThreadImpl(const char* thread_name);
- ~ProcessThreadImpl() override;
-
- void Start() override;
- void Stop() override;
-
- void WakeUp(Module* module) override;
- void PostTask(std::unique_ptr<QueuedTask> task) override;
- void PostDelayedTask(std::unique_ptr<QueuedTask> task,
- uint32_t milliseconds) override;
-
- void RegisterModule(Module* module, const rtc::Location& from) override;
- void DeRegisterModule(Module* module) override;
-
- protected:
- bool Process();
-
- private:
- struct ModuleCallback {
- ModuleCallback() = delete;
- ModuleCallback(ModuleCallback&& cb) = default;
- ModuleCallback(const ModuleCallback& cb) = default;
- ModuleCallback(Module* module, const rtc::Location& location)
- : module(module), location(location) {}
- bool operator==(const ModuleCallback& cb) const {
- return cb.module == module;
- }
-
- Module* const module;
- int64_t next_callback = 0; // Absolute timestamp.
- const rtc::Location location;
-
- private:
- ModuleCallback& operator=(ModuleCallback&);
- };
- struct DelayedTask {
- DelayedTask(int64_t run_at_ms,
- uint64_t sequence_id,
- std::unique_ptr<QueuedTask> task)
- : run_at_ms(run_at_ms),
- sequence_id_(sequence_id),
- task(task.release()) {}
- friend bool operator<(const DelayedTask& lhs, const DelayedTask& rhs) {
- // Earliest DelayedTask should be at the top of the priority queue.
- if (lhs.run_at_ms != rhs.run_at_ms) {
- return lhs.run_at_ms > rhs.run_at_ms;
- }
- return lhs.sequence_id_ > rhs.sequence_id_;
- }
-
- int64_t run_at_ms;
- uint64_t sequence_id_;
- // DelayedTask owns the `task`, but some delayed tasks must be removed from
- // the std::priority_queue, but mustn't be deleted. std::priority_queue does
- // not give non-const access to the values, so storing unique_ptr would
- // delete the task as soon as it is remove from the priority queue.
- // Thus lifetime of the `task` is managed manually.
- QueuedTask* task;
- };
- typedef std::list<ModuleCallback> ModuleList;
-
- void Delete() override;
- // The part of Stop processing that doesn't need any locking.
- void StopNoLocks();
- void WakeUpNoLocks(Module* module);
- void WakeUpInternal(Module* module) RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
-
- // Members protected by this mutex are accessed on the constructor thread and
- // on the spawned process thread, and locking is needed only while the process
- // thread is running.
- Mutex mutex_;
-
- SequenceChecker thread_checker_;
- rtc::Event wake_up_;
- rtc::PlatformThread thread_;
-
- ModuleList modules_ RTC_GUARDED_BY(mutex_);
- // Set to true when calling Process, to allow reentrant calls to WakeUp.
- bool holds_mutex_ RTC_GUARDED_BY(this) = false;
- std::queue<QueuedTask*> queue_;
- // `std::priority_queue` does not guarantee stable sort. For delayed tasks
- // with the same wakeup time, use `sequence_id_` to ensure FIFO ordering.
- std::priority_queue<DelayedTask> delayed_tasks_ RTC_GUARDED_BY(mutex_);
- uint64_t sequence_id_ RTC_GUARDED_BY(mutex_) = 0;
- // The `stop_` flag is modified only by the construction thread, protected by
- // `thread_checker_`. It is read also by the spawned `thread_`. The latter
- // thread must take `mutex_` before access, and for thread safety, the
- // constructor thread needs to take `mutex_` when it modifies `stop_` and
- // `thread_` is running. Annotations like RTC_GUARDED_BY doesn't support this
- // usage pattern.
- bool stop_ RTC_GUARDED_BY(mutex_);
- const char* thread_name_;
-};
-
-} // namespace webrtc
-
-#endif // MODULES_UTILITY_SOURCE_PROCESS_THREAD_IMPL_H_
diff --git a/modules/utility/source/process_thread_impl_unittest.cc b/modules/utility/source/process_thread_impl_unittest.cc
deleted file mode 100644
index 1fef0b6..0000000
--- a/modules/utility/source/process_thread_impl_unittest.cc
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
- *
- * Use of this source code is governed by a BSD-style license
- * that can be found in the LICENSE file in the root of the source
- * tree. An additional intellectual property rights grant can be found
- * in the file PATENTS. All contributing project authors may
- * be found in the AUTHORS file in the root of the source tree.
- */
-
-#include "modules/utility/source/process_thread_impl.h"
-
-#include <memory>
-#include <utility>
-
-#include "api/task_queue/queued_task.h"
-#include "api/task_queue/task_queue_test.h"
-#include "modules/include/module.h"
-#include "rtc_base/location.h"
-#include "rtc_base/time_utils.h"
-#include "test/gmock.h"
-#include "test/gtest.h"
-
-namespace webrtc {
-
-using ::testing::_;
-using ::testing::DoAll;
-using ::testing::InSequence;
-using ::testing::Invoke;
-using ::testing::Return;
-using ::testing::SetArgPointee;
-
-// The length of time, in milliseconds, to wait for an event to become signaled.
-// Set to a fairly large value as there is quite a bit of variation on some
-// Windows bots.
-static const int kEventWaitTimeout = 500;
-
-class MockModule : public Module {
- public:
- MOCK_METHOD(int64_t, TimeUntilNextProcess, (), (override));
- MOCK_METHOD(void, Process, (), (override));
- MOCK_METHOD(void, ProcessThreadAttached, (ProcessThread*), (override));
-};
-
-class RaiseEventTask : public QueuedTask {
- public:
- RaiseEventTask(rtc::Event* event) : event_(event) {}
- bool Run() override {
- event_->Set();
- return true;
- }
-
- private:
- rtc::Event* event_;
-};
-
-ACTION_P(SetEvent, event) {
- event->Set();
-}
-
-ACTION_P(Increment, counter) {
- ++(*counter);
-}
-
-ACTION_P(SetTimestamp, ptr) {
- *ptr = rtc::TimeMillis();
-}
-
-TEST(ProcessThreadImpl, StartStop) {
- ProcessThreadImpl thread("ProcessThread");
- thread.Start();
- thread.Stop();
-}
-
-TEST(ProcessThreadImpl, MultipleStartStop) {
- ProcessThreadImpl thread("ProcessThread");
- for (int i = 0; i < 5; ++i) {
- thread.Start();
- thread.Stop();
- }
-}
-
-// Verifies that we get at least call back to Process() on the worker thread.
-TEST(ProcessThreadImpl, ProcessCall) {
- ProcessThreadImpl thread("ProcessThread");
- thread.Start();
-
- rtc::Event event;
-
- MockModule module;
- EXPECT_CALL(module, TimeUntilNextProcess())
- .WillOnce(Return(0))
- .WillRepeatedly(Return(1));
- EXPECT_CALL(module, Process())
- .WillOnce(DoAll(SetEvent(&event), Return()))
- .WillRepeatedly(Return());
- EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
-
- thread.RegisterModule(&module, RTC_FROM_HERE);
- EXPECT_TRUE(event.Wait(kEventWaitTimeout));
-
- EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
- thread.Stop();
-}
-
-// Same as ProcessCall except the module is registered before the
-// call to Start().
-TEST(ProcessThreadImpl, ProcessCall2) {
- ProcessThreadImpl thread("ProcessThread");
- rtc::Event event;
-
- MockModule module;
- EXPECT_CALL(module, TimeUntilNextProcess())
- .WillOnce(Return(0))
- .WillRepeatedly(Return(1));
- EXPECT_CALL(module, Process())
- .WillOnce(DoAll(SetEvent(&event), Return()))
- .WillRepeatedly(Return());
-
- thread.RegisterModule(&module, RTC_FROM_HERE);
-
- EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
- thread.Start();
- EXPECT_TRUE(event.Wait(kEventWaitTimeout));
-
- EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
- thread.Stop();
-}
-
-// Tests setting up a module for callbacks and then unregister that module.
-// After unregistration, we should not receive any further callbacks.
-TEST(ProcessThreadImpl, Deregister) {
- ProcessThreadImpl thread("ProcessThread");
- rtc::Event event;
-
- int process_count = 0;
- MockModule module;
- EXPECT_CALL(module, TimeUntilNextProcess())
- .WillOnce(Return(0))
- .WillRepeatedly(Return(1));
- EXPECT_CALL(module, Process())
- .WillOnce(DoAll(SetEvent(&event), Increment(&process_count), Return()))
- .WillRepeatedly(DoAll(Increment(&process_count), Return()));
-
- thread.RegisterModule(&module, RTC_FROM_HERE);
-
- EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
- thread.Start();
-
- EXPECT_TRUE(event.Wait(kEventWaitTimeout));
-
- EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
- thread.DeRegisterModule(&module);
-
- EXPECT_GE(process_count, 1);
- int count_after_deregister = process_count;
-
- // We shouldn't get any more callbacks.
- EXPECT_FALSE(event.Wait(20));
- EXPECT_EQ(count_after_deregister, process_count);
- thread.Stop();
-}
-
-// Helper function for testing receiving a callback after a certain amount of
-// time. There's some variance of timing built into it to reduce chance of
-// flakiness on bots.
-void ProcessCallAfterAFewMs(int64_t milliseconds) {
- ProcessThreadImpl thread("ProcessThread");
- thread.Start();
-
- rtc::Event event;
-
- MockModule module;
- int64_t start_time = 0;
- int64_t called_time = 0;
- EXPECT_CALL(module, TimeUntilNextProcess())
- .WillOnce(DoAll(SetTimestamp(&start_time), Return(milliseconds)))
- .WillRepeatedly(Return(milliseconds));
- EXPECT_CALL(module, Process())
- .WillOnce(DoAll(SetTimestamp(&called_time), SetEvent(&event), Return()))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
- thread.RegisterModule(&module, RTC_FROM_HERE);
-
- // Add a buffer of 50ms due to slowness of some trybots
- // (e.g. win_drmemory_light)
- EXPECT_TRUE(event.Wait(milliseconds + 50));
-
- EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
- thread.Stop();
-
- ASSERT_GT(start_time, 0);
- ASSERT_GT(called_time, 0);
- // Use >= instead of > since due to rounding and timer accuracy (or lack
- // thereof), can make the test run in "0"ms time.
- EXPECT_GE(called_time, start_time);
- // Check for an acceptable range.
- uint32_t diff = called_time - start_time;
- EXPECT_GE(diff, milliseconds - 15);
- EXPECT_LT(diff, milliseconds + 15);
-}
-
-// DISABLED for now since the virtual build bots are too slow :(
-// TODO(tommi): Fix.
-TEST(ProcessThreadImpl, DISABLED_ProcessCallAfter5ms) {
- ProcessCallAfterAFewMs(5);
-}
-
-// DISABLED for now since the virtual build bots are too slow :(
-// TODO(tommi): Fix.
-TEST(ProcessThreadImpl, DISABLED_ProcessCallAfter50ms) {
- ProcessCallAfterAFewMs(50);
-}
-
-// DISABLED for now since the virtual build bots are too slow :(
-// TODO(tommi): Fix.
-TEST(ProcessThreadImpl, DISABLED_ProcessCallAfter200ms) {
- ProcessCallAfterAFewMs(200);
-}
-
-// Runs callbacks with the goal of getting up to 50 callbacks within a second
-// (on average 1 callback every 20ms). On real hardware, we're usually pretty
-// close to that, but the test bots that run on virtual machines, will
-// typically be in the range 30-40 callbacks.
-// DISABLED for now since this can take up to 2 seconds to run on the slowest
-// build bots.
-// TODO(tommi): Fix.
-TEST(ProcessThreadImpl, DISABLED_Process50Times) {
- ProcessThreadImpl thread("ProcessThread");
- thread.Start();
-
- rtc::Event event;
-
- MockModule module;
- int callback_count = 0;
- // Ask for a callback after 20ms.
- EXPECT_CALL(module, TimeUntilNextProcess()).WillRepeatedly(Return(20));
- EXPECT_CALL(module, Process())
- .WillRepeatedly(DoAll(Increment(&callback_count), Return()));
-
- EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
- thread.RegisterModule(&module, RTC_FROM_HERE);
-
- EXPECT_TRUE(event.Wait(1000));
-
- EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
- thread.Stop();
-
- printf("Callback count: %i\n", callback_count);
- // Check that we got called back up to 50 times.
- // Some of the try bots run on slow virtual machines, so the lower bound
- // is much more relaxed to avoid flakiness.
- EXPECT_GE(callback_count, 25);
- EXPECT_LE(callback_count, 50);
-}
-
-// Tests that we can wake up the worker thread to give us a callback right
-// away when we know the thread is sleeping.
-TEST(ProcessThreadImpl, WakeUp) {
- ProcessThreadImpl thread("ProcessThread");
- thread.Start();
-
- rtc::Event started;
- rtc::Event called;
-
- MockModule module;
- int64_t start_time;
- int64_t called_time;
-
- // Ask for a callback after 1000ms.
- // TimeUntilNextProcess will be called twice.
- // The first time we use it to get the thread into a waiting state.
- // Then we wake the thread and there should not be another call made to
- // TimeUntilNextProcess before Process() is called.
- // The second time TimeUntilNextProcess is then called, is after Process
- // has been called and we don't expect any more calls.
- EXPECT_CALL(module, TimeUntilNextProcess())
- .WillOnce(
- DoAll(SetTimestamp(&start_time), SetEvent(&started), Return(1000)))
- .WillOnce(Return(1000));
- EXPECT_CALL(module, Process())
- .WillOnce(DoAll(SetTimestamp(&called_time), SetEvent(&called), Return()))
- .WillRepeatedly(Return());
-
- EXPECT_CALL(module, ProcessThreadAttached(&thread)).Times(1);
- thread.RegisterModule(&module, RTC_FROM_HERE);
-
- EXPECT_TRUE(started.Wait(kEventWaitTimeout));
- thread.WakeUp(&module);
- EXPECT_TRUE(called.Wait(kEventWaitTimeout));
-
- EXPECT_CALL(module, ProcessThreadAttached(nullptr)).Times(1);
- thread.Stop();
-
- EXPECT_GE(called_time, start_time);
- uint32_t diff = called_time - start_time;
- // We should have been called back much quicker than 1sec.
- EXPECT_LE(diff, 100u);
-}
-
-// Tests that we can post a task that gets run straight away on the worker
-// thread.
-TEST(ProcessThreadImpl, PostTask) {
- ProcessThreadImpl thread("ProcessThread");
- rtc::Event task_ran;
- std::unique_ptr<RaiseEventTask> task(new RaiseEventTask(&task_ran));
- thread.Start();
- thread.PostTask(std::move(task));
- EXPECT_TRUE(task_ran.Wait(kEventWaitTimeout));
- thread.Stop();
-}
-
-class ProcessThreadFactory : public TaskQueueFactory {
- public:
- ~ProcessThreadFactory() override = default;
- std::unique_ptr<TaskQueueBase, TaskQueueDeleter> CreateTaskQueue(
- absl::string_view name,
- Priority priority) const override {
- ProcessThreadImpl* process_thread = new ProcessThreadImpl("thread");
- process_thread->Start();
- return std::unique_ptr<TaskQueueBase, TaskQueueDeleter>(process_thread);
- }
-};
-
-INSTANTIATE_TEST_SUITE_P(
- ProcessThread,
- TaskQueueTest,
- testing::Values(std::make_unique<ProcessThreadFactory>));
-
-} // namespace webrtc
diff --git a/modules/video_processing/BUILD.gn b/modules/video_processing/BUILD.gn
index b346118..a5b847f 100644
--- a/modules/video_processing/BUILD.gn
+++ b/modules/video_processing/BUILD.gn
@@ -31,7 +31,6 @@
"../../api/video:video_rtp_headers",
"../../common_audio",
"../../common_video",
- "../../modules/utility",
"../../rtc_base:checks",
"../../rtc_base/system:arch",
"../../system_wrappers",
diff --git a/pc/BUILD.gn b/pc/BUILD.gn
index 5fac258..4195db3 100644
--- a/pc/BUILD.gn
+++ b/pc/BUILD.gn
@@ -2463,7 +2463,6 @@
"../media:rtc_media_tests_utils",
"../modules/audio_processing",
"../modules/audio_processing:api",
- "../modules/utility",
"../p2p:p2p_test_utils",
"../p2p:rtc_p2p",
"../rtc_base",