|  | /* | 
|  | *  Copyright 2004 The WebRTC Project Authors. All rights reserved. | 
|  | * | 
|  | *  Use of this source code is governed by a BSD-style license | 
|  | *  that can be found in the LICENSE file in the root of the source | 
|  | *  tree. An additional intellectual property rights grant can be found | 
|  | *  in the file PATENTS.  All contributing project authors may | 
|  | *  be found in the AUTHORS file in the root of the source tree. | 
|  | */ | 
|  |  | 
|  | #include "rtc_base/thread.h" | 
|  |  | 
|  | #include <algorithm> | 
|  | #include <atomic> | 
|  | #include <cstdint> | 
|  | #include <deque> | 
|  | #include <functional> | 
|  | #include <memory> | 
|  | #include <string> | 
|  | #include <vector> | 
|  |  | 
|  | #include "absl/functional/any_invocable.h" | 
|  | #include "absl/strings/string_view.h" | 
|  | #include "api/function_view.h" | 
|  | #include "api/location.h" | 
|  | #include "api/task_queue/task_queue_base.h" | 
|  | #include "api/units/time_delta.h" | 
|  | #include "rtc_base/platform_thread_types.h" | 
|  | #include "rtc_base/socket_server.h" | 
|  |  | 
|  | #if defined(WEBRTC_WIN) | 
|  | #include <comdef.h> | 
|  | #elif defined(WEBRTC_POSIX) | 
|  | #include <pthread.h> | 
|  | #include <time.h> | 
|  | #else | 
|  | #error "Either WEBRTC_WIN or WEBRTC_POSIX needs to be defined." | 
|  | #endif | 
|  |  | 
|  | #if defined(WEBRTC_WIN) | 
|  | // Disable warning that we don't care about: | 
|  | // warning C4722: destructor never returns, potential memory leak | 
|  | #pragma warning(disable : 4722) | 
|  | #endif | 
|  |  | 
|  | #include <stdio.h> | 
|  |  | 
|  | #include <utility> | 
|  |  | 
|  | #include "absl/algorithm/container.h" | 
|  | #include "absl/cleanup/cleanup.h" | 
|  | #include "api/sequence_checker.h" | 
|  | #include "rtc_base/checks.h" | 
|  | #include "rtc_base/event.h" | 
|  | #include "rtc_base/internal/default_socket_server.h" | 
|  | #include "rtc_base/logging.h" | 
|  | #include "rtc_base/null_socket_server.h" | 
|  | #include "rtc_base/synchronization/mutex.h" | 
|  | #include "rtc_base/time_utils.h" | 
|  | #include "rtc_base/trace_event.h" | 
|  |  | 
|  | #if defined(WEBRTC_MAC) | 
|  | #include "rtc_base/system/cocoa_threading.h" | 
|  |  | 
|  | /* | 
|  | * These are forward-declarations for methods that are part of the | 
|  | * ObjC runtime. They are declared in the private header objc-internal.h. | 
|  | * These calls are what clang inserts when using @autoreleasepool in ObjC, | 
|  | * but here they are used directly in order to keep this file C++. | 
|  | * https://clang.llvm.org/docs/AutomaticReferenceCounting.html#runtime-support | 
|  | */ | 
|  | extern "C" { | 
|  | void* objc_autoreleasePoolPush(void); | 
|  | void objc_autoreleasePoolPop(void* pool); | 
|  | } | 
|  |  | 
|  | namespace { | 
|  | class ScopedAutoReleasePool { | 
|  | public: | 
|  | ScopedAutoReleasePool() : pool_(objc_autoreleasePoolPush()) {} | 
|  | ~ScopedAutoReleasePool() { objc_autoreleasePoolPop(pool_); } | 
|  |  | 
|  | private: | 
|  | void* const pool_; | 
|  | }; | 
|  | }  // namespace | 
|  | #endif | 
|  |  | 
|  | namespace webrtc { | 
|  |  | 
|  |  | 
|  | ThreadManager* ThreadManager::Instance() { | 
|  | static ThreadManager* const thread_manager = new ThreadManager(); | 
|  | return thread_manager; | 
|  | } | 
|  |  | 
|  | ThreadManager::~ThreadManager() { | 
|  | // By above RTC_DEFINE_STATIC_LOCAL. | 
|  | RTC_DCHECK_NOTREACHED() << "ThreadManager should never be destructed."; | 
|  | } | 
|  |  | 
|  | // static | 
|  | void ThreadManager::Add(Thread* message_queue) { | 
|  | return Instance()->AddInternal(message_queue); | 
|  | } | 
|  | void ThreadManager::AddInternal(Thread* message_queue) { | 
|  | MutexLock cs(&crit_); | 
|  | message_queues_.push_back(message_queue); | 
|  | } | 
|  |  | 
|  | // static | 
|  | void ThreadManager::Remove(Thread* message_queue) { | 
|  | return Instance()->RemoveInternal(message_queue); | 
|  | } | 
|  | void ThreadManager::RemoveInternal(Thread* message_queue) { | 
|  | { | 
|  | MutexLock cs(&crit_); | 
|  | std::vector<Thread*>::iterator iter; | 
|  | iter = absl::c_find(message_queues_, message_queue); | 
|  | if (iter != message_queues_.end()) { | 
|  | message_queues_.erase(iter); | 
|  | } | 
|  | #if RTC_DCHECK_IS_ON | 
|  | RemoveFromSendGraph(message_queue); | 
|  | #endif | 
|  | } | 
|  | } | 
|  |  | 
|  | #if RTC_DCHECK_IS_ON | 
|  | void ThreadManager::RemoveFromSendGraph(Thread* thread) { | 
|  | for (auto it = send_graph_.begin(); it != send_graph_.end();) { | 
|  | if (it->first == thread) { | 
|  | it = send_graph_.erase(it); | 
|  | } else { | 
|  | it->second.erase(thread); | 
|  | ++it; | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | void ThreadManager::RegisterSendAndCheckForCycles(Thread* source, | 
|  | Thread* target) { | 
|  | RTC_DCHECK(source); | 
|  | RTC_DCHECK(target); | 
|  |  | 
|  | MutexLock cs(&crit_); | 
|  | std::deque<Thread*> all_targets({target}); | 
|  | // We check the pre-existing who-sends-to-who graph for any path from target | 
|  | // to source. This loop is guaranteed to terminate because per the send graph | 
|  | // invariant, there are no cycles in the graph. | 
|  | for (size_t i = 0; i < all_targets.size(); i++) { | 
|  | const auto& targets = send_graph_[all_targets[i]]; | 
|  | all_targets.insert(all_targets.end(), targets.begin(), targets.end()); | 
|  | } | 
|  | RTC_CHECK_EQ(absl::c_count(all_targets, source), 0) | 
|  | << " send loop between " << source->name() << " and " << target->name(); | 
|  |  | 
|  | // We may now insert source -> target without creating a cycle, since there | 
|  | // was no path from target to source per the prior CHECK. | 
|  | send_graph_[source].insert(target); | 
|  | } | 
|  | #endif | 
|  |  | 
|  | // static | 
|  | void ThreadManager::ProcessAllMessageQueuesForTesting() { | 
|  | return Instance()->ProcessAllMessageQueuesInternal(); | 
|  | } | 
|  |  | 
|  | void ThreadManager::ProcessAllMessageQueuesInternal() { | 
|  | // This works by posting a delayed message at the current time and waiting | 
|  | // for it to be dispatched on all queues, which will ensure that all messages | 
|  | // that came before it were also dispatched. | 
|  | std::atomic<int> queues_not_done(0); | 
|  |  | 
|  | { | 
|  | MutexLock cs(&crit_); | 
|  | for (Thread* queue : message_queues_) { | 
|  | if (!queue->IsProcessingMessagesForTesting()) { | 
|  | // If the queue is not processing messages, it can | 
|  | // be ignored. If we tried to post a message to it, it would be dropped | 
|  | // or ignored. | 
|  | continue; | 
|  | } | 
|  | queues_not_done.fetch_add(1); | 
|  | // Whether the task is processed, or the thread is simply cleared, | 
|  | // queues_not_done gets decremented. | 
|  | absl::Cleanup sub = [&queues_not_done] { queues_not_done.fetch_sub(1); }; | 
|  | // Post delayed task instead of regular task to wait for all delayed tasks | 
|  | // that are ready for processing. | 
|  | queue->PostDelayedTask([sub = std::move(sub)] {}, TimeDelta::Zero()); | 
|  | } | 
|  | } | 
|  |  | 
|  | Thread* current = Thread::Current(); | 
|  | // Note: One of the message queues may have been on this thread, which is | 
|  | // why we can't synchronously wait for queues_not_done to go to 0; we need | 
|  | // to process messages as well. | 
|  | while (queues_not_done.load() > 0) { | 
|  | if (current) { | 
|  | current->ProcessMessages(0); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // static | 
|  | Thread* Thread::Current() { | 
|  | ThreadManager* manager = ThreadManager::Instance(); | 
|  | Thread* thread = manager->CurrentThread(); | 
|  |  | 
|  | return thread; | 
|  | } | 
|  |  | 
|  | #if defined(WEBRTC_POSIX) | 
|  | ThreadManager::ThreadManager() { | 
|  | #if defined(WEBRTC_MAC) | 
|  | InitCocoaMultiThreading(); | 
|  | #endif | 
|  | pthread_key_create(&key_, nullptr); | 
|  | } | 
|  |  | 
|  | Thread* ThreadManager::CurrentThread() { | 
|  | return static_cast<Thread*>(pthread_getspecific(key_)); | 
|  | } | 
|  |  | 
|  | void ThreadManager::SetCurrentThreadInternal(Thread* thread) { | 
|  | pthread_setspecific(key_, thread); | 
|  | } | 
|  | #endif | 
|  |  | 
|  | #if defined(WEBRTC_WIN) | 
|  | ThreadManager::ThreadManager() : key_(TlsAlloc()) {} | 
|  |  | 
|  | Thread* ThreadManager::CurrentThread() { | 
|  | return static_cast<Thread*>(TlsGetValue(key_)); | 
|  | } | 
|  |  | 
|  | void ThreadManager::SetCurrentThreadInternal(Thread* thread) { | 
|  | TlsSetValue(key_, thread); | 
|  | } | 
|  | #endif | 
|  |  | 
|  | void ThreadManager::SetCurrentThread(Thread* thread) { | 
|  | #if RTC_DLOG_IS_ON | 
|  | if (CurrentThread() && thread) { | 
|  | RTC_DLOG(LS_ERROR) << "SetCurrentThread: Overwriting an existing value?"; | 
|  | } | 
|  | #endif  // RTC_DLOG_IS_ON | 
|  |  | 
|  | if (thread) { | 
|  | thread->EnsureIsCurrentTaskQueue(); | 
|  | } else { | 
|  | Thread* current = CurrentThread(); | 
|  | if (current) { | 
|  | // The current thread is being cleared, e.g. as a result of | 
|  | // UnwrapCurrent() being called or when a thread is being stopped | 
|  | // (see PreRun()). This signals that the Thread instance is being detached | 
|  | // from the thread, which also means that TaskQueue::Current() must not | 
|  | // return a pointer to the Thread instance. | 
|  | current->ClearCurrentTaskQueue(); | 
|  | } | 
|  | } | 
|  |  | 
|  | SetCurrentThreadInternal(thread); | 
|  | } | 
|  |  | 
|  | void ThreadManager::ChangeCurrentThreadForTest(Thread* thread) { | 
|  | SetCurrentThreadInternal(thread); | 
|  | } | 
|  |  | 
|  | Thread* ThreadManager::WrapCurrentThread() { | 
|  | Thread* result = CurrentThread(); | 
|  | if (nullptr == result) { | 
|  | result = new Thread(CreateDefaultSocketServer()); | 
|  | result->WrapCurrentWithThreadManager(this, true); | 
|  | } | 
|  | return result; | 
|  | } | 
|  |  | 
|  | void ThreadManager::UnwrapCurrentThread() { | 
|  | Thread* t = CurrentThread(); | 
|  | if (t && !(t->IsOwned())) { | 
|  | t->UnwrapCurrent(); | 
|  | delete t; | 
|  | } | 
|  | } | 
|  |  | 
|  | Thread::ScopedDisallowBlockingCalls::ScopedDisallowBlockingCalls() | 
|  | : thread_(Thread::Current()), | 
|  | previous_state_(thread_->SetAllowBlockingCalls(false)) {} | 
|  |  | 
|  | Thread::ScopedDisallowBlockingCalls::~ScopedDisallowBlockingCalls() { | 
|  | RTC_DCHECK(thread_->IsCurrent()); | 
|  | thread_->SetAllowBlockingCalls(previous_state_); | 
|  | } | 
|  |  | 
|  | #if RTC_DCHECK_IS_ON | 
|  | Thread::ScopedCountBlockingCalls::ScopedCountBlockingCalls( | 
|  | std::function<void(uint32_t, uint32_t)> callback) | 
|  | : thread_(Thread::Current()), | 
|  | base_blocking_call_count_(thread_->GetBlockingCallCount()), | 
|  | base_could_be_blocking_call_count_( | 
|  | thread_->GetCouldBeBlockingCallCount()), | 
|  | result_callback_(std::move(callback)) {} | 
|  |  | 
|  | Thread::ScopedCountBlockingCalls::~ScopedCountBlockingCalls() { | 
|  | if (GetTotalBlockedCallCount() >= min_blocking_calls_for_callback_) { | 
|  | result_callback_(GetBlockingCallCount(), GetCouldBeBlockingCallCount()); | 
|  | } | 
|  | } | 
|  |  | 
|  | uint32_t Thread::ScopedCountBlockingCalls::GetBlockingCallCount() const { | 
|  | return thread_->GetBlockingCallCount() - base_blocking_call_count_; | 
|  | } | 
|  |  | 
|  | uint32_t Thread::ScopedCountBlockingCalls::GetCouldBeBlockingCallCount() const { | 
|  | return thread_->GetCouldBeBlockingCallCount() - | 
|  | base_could_be_blocking_call_count_; | 
|  | } | 
|  |  | 
|  | uint32_t Thread::ScopedCountBlockingCalls::GetTotalBlockedCallCount() const { | 
|  | return GetBlockingCallCount() + GetCouldBeBlockingCallCount(); | 
|  | } | 
|  | #endif | 
|  |  | 
|  | Thread::Thread(SocketServer* ss) : Thread(ss, /*do_init=*/true) {} | 
|  |  | 
|  | Thread::Thread(std::unique_ptr<SocketServer> ss) | 
|  | : Thread(std::move(ss), /*do_init=*/true) {} | 
|  |  | 
|  | Thread::Thread(SocketServer* ss, bool do_init) | 
|  | : delayed_next_num_(0), | 
|  | fInitialized_(false), | 
|  | fDestroyed_(false), | 
|  | stop_(0), | 
|  | ss_(ss) { | 
|  | RTC_DCHECK(ss); | 
|  | ss_->SetMessageQueue(this); | 
|  | SetName("Thread", this);  // default name | 
|  | if (do_init) { | 
|  | DoInit(); | 
|  | } | 
|  | } | 
|  |  | 
|  | Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init) | 
|  | : Thread(ss.get(), do_init) { | 
|  | own_ss_ = std::move(ss); | 
|  | } | 
|  |  | 
|  | Thread::~Thread() { | 
|  | Stop(); | 
|  | DoDestroy(); | 
|  | } | 
|  |  | 
|  | void Thread::DoInit() { | 
|  | if (fInitialized_) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | fInitialized_ = true; | 
|  | ThreadManager::Add(this); | 
|  | } | 
|  |  | 
|  | void Thread::DoDestroy() { | 
|  | if (fDestroyed_) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | fDestroyed_ = true; | 
|  | // The signal is done from here to ensure | 
|  | // that it always gets called when the queue | 
|  | // is going away. | 
|  | if (ss_) { | 
|  | ss_->SetMessageQueue(nullptr); | 
|  | } | 
|  | ThreadManager::Remove(this); | 
|  | // Clear. | 
|  | CurrentTaskQueueSetter set_current(this); | 
|  | messages_ = {}; | 
|  | delayed_messages_ = {}; | 
|  | } | 
|  |  | 
|  | SocketServer* Thread::socketserver() { | 
|  | return ss_; | 
|  | } | 
|  |  | 
|  | void Thread::WakeUpSocketServer() { | 
|  | ss_->WakeUp(); | 
|  | } | 
|  |  | 
|  | void Thread::Quit() { | 
|  | stop_.store(1, std::memory_order_release); | 
|  | WakeUpSocketServer(); | 
|  | } | 
|  |  | 
|  | bool Thread::IsQuitting() { | 
|  | return stop_.load(std::memory_order_acquire) != 0; | 
|  | } | 
|  |  | 
|  | void Thread::Restart() { | 
|  | stop_.store(0, std::memory_order_release); | 
|  | } | 
|  |  | 
|  | absl::AnyInvocable<void() &&> Thread::Get(int cmsWait) { | 
|  | // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch | 
|  |  | 
|  | int64_t cmsTotal = cmsWait; | 
|  | int64_t cmsElapsed = 0; | 
|  | int64_t msStart = TimeMillis(); | 
|  | int64_t msCurrent = msStart; | 
|  | while (true) { | 
|  | // Check for posted events | 
|  | int64_t cmsDelayNext = kForever; | 
|  | { | 
|  | // All queue operations need to be locked, but nothing else in this loop | 
|  | // can happen while holding the `mutex_`. | 
|  | MutexLock lock(&mutex_); | 
|  | // Check for delayed messages that have been triggered and calculate the | 
|  | // next trigger time. | 
|  | while (!delayed_messages_.empty()) { | 
|  | if (msCurrent < delayed_messages_.top().run_time_ms) { | 
|  | cmsDelayNext = | 
|  | TimeDiff(delayed_messages_.top().run_time_ms, msCurrent); | 
|  | break; | 
|  | } | 
|  | messages_.push(std::move(delayed_messages_.top().functor)); | 
|  | delayed_messages_.pop(); | 
|  | } | 
|  | // Pull a message off the message queue, if available. | 
|  | if (!messages_.empty()) { | 
|  | absl::AnyInvocable<void() &&> task = std::move(messages_.front()); | 
|  | messages_.pop(); | 
|  | return task; | 
|  | } | 
|  | } | 
|  |  | 
|  | if (IsQuitting()) | 
|  | break; | 
|  |  | 
|  | // Which is shorter, the delay wait or the asked wait? | 
|  |  | 
|  | int64_t cmsNext; | 
|  | if (cmsWait == kForever) { | 
|  | cmsNext = cmsDelayNext; | 
|  | } else { | 
|  | cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed); | 
|  | if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) | 
|  | cmsNext = cmsDelayNext; | 
|  | } | 
|  |  | 
|  | { | 
|  | // Wait and multiplex in the meantime | 
|  | if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever | 
|  | : TimeDelta::Millis(cmsNext), | 
|  | /*process_io=*/true)) | 
|  | return nullptr; | 
|  | } | 
|  |  | 
|  | // If the specified timeout expired, return | 
|  |  | 
|  | msCurrent = TimeMillis(); | 
|  | cmsElapsed = TimeDiff(msCurrent, msStart); | 
|  | if (cmsWait != kForever) { | 
|  | if (cmsElapsed >= cmsWait) | 
|  | return nullptr; | 
|  | } | 
|  | } | 
|  | return nullptr; | 
|  | } | 
|  |  | 
|  | void Thread::PostTaskImpl(absl::AnyInvocable<void() &&> task, | 
|  | const PostTaskTraits& /* traits */, | 
|  | const Location& /* location */) { | 
|  | if (IsQuitting()) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | // Keep thread safe | 
|  | // Add the message to the end of the queue | 
|  | // Signal for the multiplexer to return | 
|  |  | 
|  | { | 
|  | MutexLock lock(&mutex_); | 
|  | messages_.push(std::move(task)); | 
|  | } | 
|  | WakeUpSocketServer(); | 
|  | } | 
|  |  | 
|  | void Thread::PostDelayedTaskImpl(absl::AnyInvocable<void() &&> task, | 
|  | TimeDelta delay, | 
|  | const PostDelayedTaskTraits& /* traits */, | 
|  | const Location& /* location */) { | 
|  | if (IsQuitting()) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | // Keep thread safe | 
|  | // Add to the priority queue. Gets sorted soonest first. | 
|  | // Signal for the multiplexer to return. | 
|  |  | 
|  | int64_t delay_ms = delay.RoundUpTo(TimeDelta::Millis(1)).ms<int>(); | 
|  | int64_t run_time_ms = TimeAfter(delay_ms); | 
|  | { | 
|  | MutexLock lock(&mutex_); | 
|  | delayed_messages_.push({.delay_ms = delay_ms, | 
|  | .run_time_ms = run_time_ms, | 
|  | .message_number = delayed_next_num_, | 
|  | .functor = std::move(task)}); | 
|  | // If this message queue processes 1 message every millisecond for 50 days, | 
|  | // we will wrap this number.  Even then, only messages with identical times | 
|  | // will be misordered, and then only briefly.  This is probably ok. | 
|  | ++delayed_next_num_; | 
|  | RTC_DCHECK_NE(0, delayed_next_num_); | 
|  | } | 
|  | WakeUpSocketServer(); | 
|  | } | 
|  |  | 
|  | int Thread::GetDelay() { | 
|  | MutexLock lock(&mutex_); | 
|  |  | 
|  | if (!messages_.empty()) | 
|  | return 0; | 
|  |  | 
|  | if (!delayed_messages_.empty()) { | 
|  | int delay = TimeUntil(delayed_messages_.top().run_time_ms); | 
|  | if (delay < 0) | 
|  | delay = 0; | 
|  | return delay; | 
|  | } | 
|  |  | 
|  | return kForever; | 
|  | } | 
|  |  | 
|  | void Thread::Dispatch(absl::AnyInvocable<void() &&> task) { | 
|  | TRACE_EVENT0("webrtc", "Thread::Dispatch"); | 
|  | RTC_DCHECK_RUN_ON(this); | 
|  | int64_t start_time = TimeMillis(); | 
|  | std::move(task)(); | 
|  | int64_t end_time = TimeMillis(); | 
|  | int64_t diff = TimeDiff(end_time, start_time); | 
|  | if (diff >= dispatch_warning_ms_) { | 
|  | RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff | 
|  | << "ms to dispatch."; | 
|  | // To avoid log spew, move the warning limit to only give warning | 
|  | // for delays that are larger than the one observed. | 
|  | dispatch_warning_ms_ = diff + 1; | 
|  | } | 
|  | } | 
|  |  | 
|  | bool Thread::IsCurrent() const { | 
|  | return ThreadManager::Instance()->CurrentThread() == this; | 
|  | } | 
|  |  | 
|  | std::unique_ptr<Thread> Thread::CreateWithSocketServer() { | 
|  | return std::unique_ptr<Thread>(new Thread(CreateDefaultSocketServer())); | 
|  | } | 
|  |  | 
|  | std::unique_ptr<Thread> Thread::Create() { | 
|  | return std::unique_ptr<Thread>( | 
|  | new Thread(std::unique_ptr<SocketServer>(new NullSocketServer()))); | 
|  | } | 
|  |  | 
|  | bool Thread::SleepMs(int milliseconds) { | 
|  | AssertBlockingIsAllowedOnCurrentThread(); | 
|  |  | 
|  | #if defined(WEBRTC_WIN) | 
|  | ::Sleep(milliseconds); | 
|  | return true; | 
|  | #else | 
|  | // POSIX has both a usleep() and a nanosleep(), but the former is deprecated, | 
|  | // so we use nanosleep() even though it has greater precision than necessary. | 
|  | struct timespec ts; | 
|  | ts.tv_sec = milliseconds / 1000; | 
|  | ts.tv_nsec = (milliseconds % 1000) * 1000000; | 
|  | int ret = nanosleep(&ts, nullptr); | 
|  | if (ret != 0) { | 
|  | RTC_LOG_ERR(LS_WARNING) << "nanosleep() returning early"; | 
|  | return false; | 
|  | } | 
|  | return true; | 
|  | #endif | 
|  | } | 
|  |  | 
|  | bool Thread::SetName(absl::string_view name, const void* obj) { | 
|  | RTC_DCHECK(!IsRunning()); | 
|  |  | 
|  | name_ = std::string(name); | 
|  | if (obj) { | 
|  | // The %p specifier typically produce at most 16 hex digits, possibly with a | 
|  | // 0x prefix. But format is implementation defined, so add some margin. | 
|  | char buf[30]; | 
|  | snprintf(buf, sizeof(buf), " 0x%p", obj); | 
|  | name_ += buf; | 
|  | } | 
|  | return true; | 
|  | } | 
|  |  | 
|  | void Thread::SetDispatchWarningMs(int deadline) { | 
|  | if (!IsCurrent()) { | 
|  | PostTask([this, deadline]() { SetDispatchWarningMs(deadline); }); | 
|  | return; | 
|  | } | 
|  | RTC_DCHECK_RUN_ON(this); | 
|  | dispatch_warning_ms_ = deadline; | 
|  | } | 
|  |  | 
|  | bool Thread::Start() { | 
|  | RTC_DCHECK(!IsRunning()); | 
|  |  | 
|  | if (IsRunning()) | 
|  | return false; | 
|  |  | 
|  | Restart();  // reset IsQuitting() if the thread is being restarted | 
|  |  | 
|  | // Make sure that ThreadManager is created on the main thread before | 
|  | // we start a new thread. | 
|  | ThreadManager::Instance(); | 
|  |  | 
|  | owned_ = true; | 
|  |  | 
|  | #if defined(WEBRTC_WIN) | 
|  | thread_ = CreateThread(nullptr, 0, PreRun, this, 0, &thread_id_); | 
|  | if (!thread_) { | 
|  | return false; | 
|  | } | 
|  | #elif defined(WEBRTC_POSIX) | 
|  | pthread_attr_t attr; | 
|  | pthread_attr_init(&attr); | 
|  |  | 
|  | int error_code = pthread_create(&thread_, &attr, PreRun, this); | 
|  | if (0 != error_code) { | 
|  | RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code; | 
|  | thread_ = 0; | 
|  | return false; | 
|  | } | 
|  | RTC_DCHECK(thread_); | 
|  | #endif | 
|  | return true; | 
|  | } | 
|  |  | 
|  | bool Thread::WrapCurrent() { | 
|  | return WrapCurrentWithThreadManager(ThreadManager::Instance(), true); | 
|  | } | 
|  |  | 
|  | void Thread::UnwrapCurrent() { | 
|  | // Clears the platform-specific thread-specific storage. | 
|  | ThreadManager::Instance()->SetCurrentThread(nullptr); | 
|  | #if defined(WEBRTC_WIN) | 
|  | if (thread_ != nullptr) { | 
|  | if (!CloseHandle(thread_)) { | 
|  | RTC_LOG_GLE(LS_ERROR) | 
|  | << "When unwrapping thread, failed to close handle."; | 
|  | } | 
|  | thread_ = nullptr; | 
|  | thread_id_ = 0; | 
|  | } | 
|  | #elif defined(WEBRTC_POSIX) | 
|  | thread_ = 0; | 
|  | #endif | 
|  | } | 
|  |  | 
|  | void Thread::SafeWrapCurrent() { | 
|  | WrapCurrentWithThreadManager(ThreadManager::Instance(), false); | 
|  | } | 
|  |  | 
|  | void Thread::Join() { | 
|  | if (!IsRunning()) | 
|  | return; | 
|  |  | 
|  | RTC_DCHECK(!IsCurrent()); | 
|  | if (Current() && !Current()->blocking_calls_allowed_) { | 
|  | RTC_LOG(LS_WARNING) << "Waiting for the thread to join, " | 
|  | "but blocking calls have been disallowed"; | 
|  | } | 
|  |  | 
|  | #if defined(WEBRTC_WIN) | 
|  | RTC_DCHECK(thread_ != nullptr); | 
|  | WaitForSingleObject(thread_, INFINITE); | 
|  | CloseHandle(thread_); | 
|  | thread_ = nullptr; | 
|  | thread_id_ = 0; | 
|  | #elif defined(WEBRTC_POSIX) | 
|  | pthread_join(thread_, nullptr); | 
|  | thread_ = 0; | 
|  | #endif | 
|  | } | 
|  |  | 
|  | bool Thread::SetAllowBlockingCalls(bool allow) { | 
|  | RTC_DCHECK(IsCurrent()); | 
|  | bool previous = blocking_calls_allowed_; | 
|  | blocking_calls_allowed_ = allow; | 
|  | return previous; | 
|  | } | 
|  |  | 
|  | // static | 
|  | void Thread::AssertBlockingIsAllowedOnCurrentThread() { | 
|  | #if !defined(NDEBUG) | 
|  | Thread* current = Thread::Current(); | 
|  | RTC_DCHECK(!current || current->blocking_calls_allowed_); | 
|  | #endif | 
|  | } | 
|  |  | 
|  | // static | 
|  | #if defined(WEBRTC_WIN) | 
|  | DWORD WINAPI Thread::PreRun(LPVOID pv) { | 
|  | #else | 
|  | void* Thread::PreRun(void* pv) { | 
|  | #endif | 
|  | Thread* thread = static_cast<Thread*>(pv); | 
|  | ThreadManager::Instance()->SetCurrentThread(thread); | 
|  | SetCurrentThreadName(thread->name_.c_str()); | 
|  | #if defined(WEBRTC_MAC) | 
|  | ScopedAutoReleasePool pool; | 
|  | #endif | 
|  | thread->Run(); | 
|  |  | 
|  | ThreadManager::Instance()->SetCurrentThread(nullptr); | 
|  | #ifdef WEBRTC_WIN | 
|  | return 0; | 
|  | #else | 
|  | return nullptr; | 
|  | #endif | 
|  | } | 
|  |  | 
|  | void Thread::Run() { | 
|  | ProcessMessages(kForever); | 
|  | } | 
|  |  | 
|  | bool Thread::IsOwned() { | 
|  | RTC_DCHECK(IsRunning()); | 
|  | return owned_; | 
|  | } | 
|  |  | 
|  | void Thread::Stop() { | 
|  | Thread::Quit(); | 
|  | Join(); | 
|  | } | 
|  |  | 
|  | void Thread::BlockingCallImpl(FunctionView<void()> functor, | 
|  | const Location& /* location */) { | 
|  | TRACE_EVENT0("webrtc", "Thread::BlockingCall"); | 
|  |  | 
|  | RTC_DCHECK(!IsQuitting()); | 
|  | if (IsQuitting()) | 
|  | return; | 
|  |  | 
|  | if (IsCurrent()) { | 
|  | #if RTC_DCHECK_IS_ON | 
|  | RTC_DCHECK(this->IsInvokeToThreadAllowed(this)); | 
|  | RTC_DCHECK_RUN_ON(this); | 
|  | could_be_blocking_call_count_++; | 
|  | #endif | 
|  | functor(); | 
|  | return; | 
|  | } | 
|  |  | 
|  | #if RTC_DCHECK_IS_ON | 
|  | if (Thread* current_thread = Thread::Current()) { | 
|  | RTC_DCHECK_RUN_ON(current_thread); | 
|  | RTC_DCHECK(current_thread->blocking_calls_allowed_); | 
|  | current_thread->blocking_call_count_++; | 
|  | RTC_DCHECK(current_thread->IsInvokeToThreadAllowed(this)); | 
|  | ThreadManager::Instance()->RegisterSendAndCheckForCycles(current_thread, | 
|  | this); | 
|  | } | 
|  | #endif | 
|  |  | 
|  | Event done; | 
|  | absl::Cleanup cleanup = [&done] { done.Set(); }; | 
|  | PostTask([functor, cleanup = std::move(cleanup)] { functor(); }); | 
|  | done.Wait(Event::kForever); | 
|  | } | 
|  |  | 
|  | // Called by the ThreadManager when being set as the current thread. | 
|  | void Thread::EnsureIsCurrentTaskQueue() { | 
|  | task_queue_registration_ = | 
|  | std::make_unique<TaskQueueBase::CurrentTaskQueueSetter>(this); | 
|  | } | 
|  |  | 
|  | // Called by the ThreadManager when being set as the current thread. | 
|  | void Thread::ClearCurrentTaskQueue() { | 
|  | task_queue_registration_.reset(); | 
|  | } | 
|  |  | 
|  | void Thread::AllowInvokesToThread(Thread* thread) { | 
|  | #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) | 
|  | if (!IsCurrent()) { | 
|  | PostTask([thread, this]() { AllowInvokesToThread(thread); }); | 
|  | return; | 
|  | } | 
|  | RTC_DCHECK_RUN_ON(this); | 
|  | allowed_threads_.push_back(thread); | 
|  | invoke_policy_enabled_ = true; | 
|  | #endif | 
|  | } | 
|  |  | 
|  | void Thread::DisallowAllInvokes() { | 
|  | #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) | 
|  | if (!IsCurrent()) { | 
|  | PostTask([this]() { DisallowAllInvokes(); }); | 
|  | return; | 
|  | } | 
|  | RTC_DCHECK_RUN_ON(this); | 
|  | allowed_threads_.clear(); | 
|  | invoke_policy_enabled_ = true; | 
|  | #endif | 
|  | } | 
|  |  | 
|  | #if RTC_DCHECK_IS_ON | 
|  | uint32_t Thread::GetBlockingCallCount() const { | 
|  | RTC_DCHECK_RUN_ON(this); | 
|  | return blocking_call_count_; | 
|  | } | 
|  | uint32_t Thread::GetCouldBeBlockingCallCount() const { | 
|  | RTC_DCHECK_RUN_ON(this); | 
|  | return could_be_blocking_call_count_; | 
|  | } | 
|  | #endif | 
|  |  | 
|  | // Returns true if no policies added or if there is at least one policy | 
|  | // that permits invocation to `target` thread. | 
|  | bool Thread::IsInvokeToThreadAllowed(Thread* target) { | 
|  | #if (!defined(NDEBUG) || RTC_DCHECK_IS_ON) | 
|  | RTC_DCHECK_RUN_ON(this); | 
|  | if (!invoke_policy_enabled_) { | 
|  | return true; | 
|  | } | 
|  | for (const auto* thread : allowed_threads_) { | 
|  | if (thread == target) { | 
|  | return true; | 
|  | } | 
|  | } | 
|  | return false; | 
|  | #else | 
|  | return true; | 
|  | #endif | 
|  | } | 
|  |  | 
|  | void Thread::Delete() { | 
|  | Stop(); | 
|  | delete this; | 
|  | } | 
|  |  | 
|  | bool Thread::IsProcessingMessagesForTesting() { | 
|  | return (owned_ || IsCurrent()) && !IsQuitting(); | 
|  | } | 
|  |  | 
|  | bool Thread::ProcessMessages(int cmsLoop) { | 
|  | // Using ProcessMessages with a custom clock for testing and a time greater | 
|  | // than 0 doesn't work, since it's not guaranteed to advance the custom | 
|  | // clock's time, and may get stuck in an infinite loop. | 
|  | RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 || | 
|  | cmsLoop == kForever); | 
|  | int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop); | 
|  | int cmsNext = cmsLoop; | 
|  |  | 
|  | while (true) { | 
|  | #if defined(WEBRTC_MAC) | 
|  | ScopedAutoReleasePool pool; | 
|  | #endif | 
|  | absl::AnyInvocable<void() &&> task = Get(cmsNext); | 
|  | if (!task) | 
|  | return !IsQuitting(); | 
|  | Dispatch(std::move(task)); | 
|  |  | 
|  | if (cmsLoop != kForever) { | 
|  | cmsNext = static_cast<int>(TimeUntil(msEnd)); | 
|  | if (cmsNext < 0) | 
|  | return true; | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | bool Thread::WrapCurrentWithThreadManager( | 
|  | ThreadManager* thread_manager, | 
|  | [[maybe_unused]] bool need_synchronize_access) { | 
|  | RTC_DCHECK(!IsRunning()); | 
|  |  | 
|  | #if defined(WEBRTC_WIN) | 
|  | if (need_synchronize_access) { | 
|  | // We explicitly ask for no rights other than synchronization. | 
|  | // This gives us the best chance of succeeding. | 
|  | thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId()); | 
|  | if (!thread_) { | 
|  | RTC_LOG_GLE(LS_ERROR) << "Unable to get handle to thread."; | 
|  | return false; | 
|  | } | 
|  | thread_id_ = GetCurrentThreadId(); | 
|  | } | 
|  | #elif defined(WEBRTC_POSIX) | 
|  | thread_ = pthread_self(); | 
|  | #endif | 
|  | owned_ = false; | 
|  | thread_manager->SetCurrentThread(this); | 
|  | return true; | 
|  | } | 
|  |  | 
|  | bool Thread::IsRunning() { | 
|  | #if defined(WEBRTC_WIN) | 
|  | return thread_ != nullptr; | 
|  | #elif defined(WEBRTC_POSIX) | 
|  | return thread_ != 0; | 
|  | #endif | 
|  | } | 
|  |  | 
|  | AutoThread::AutoThread() | 
|  | : Thread(CreateDefaultSocketServer(), /*do_init=*/false) { | 
|  | if (!ThreadManager::Instance()->CurrentThread()) { | 
|  | // DoInit registers with ThreadManager. Do that only if we intend to | 
|  | // be webrtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal | 
|  | // will post a message to a queue that no running thread is serving. | 
|  | DoInit(); | 
|  | ThreadManager::Instance()->SetCurrentThread(this); | 
|  | } | 
|  | } | 
|  |  | 
|  | AutoThread::~AutoThread() { | 
|  | Stop(); | 
|  | DoDestroy(); | 
|  | if (ThreadManager::Instance()->CurrentThread() == this) { | 
|  | ThreadManager::Instance()->SetCurrentThread(nullptr); | 
|  | } | 
|  | } | 
|  |  | 
|  | AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss) | 
|  | : Thread(ss, /*do_init=*/false) { | 
|  | DoInit(); | 
|  | old_thread_ = ThreadManager::Instance()->CurrentThread(); | 
|  | // Temporarily set the current thread to nullptr so that we can keep checks | 
|  | // around that catch unintentional pointer overwrites. | 
|  | ThreadManager::Instance()->SetCurrentThread(nullptr); | 
|  | ThreadManager::Instance()->SetCurrentThread(this); | 
|  | if (old_thread_) { | 
|  | ThreadManager::Remove(old_thread_); | 
|  | } | 
|  | } | 
|  |  | 
|  | AutoSocketServerThread::~AutoSocketServerThread() { | 
|  | RTC_DCHECK(ThreadManager::Instance()->CurrentThread() == this); | 
|  | // Stop and destroy the thread before clearing it as the current thread. | 
|  | // Sometimes there are messages left in the Thread that will be | 
|  | // destroyed by DoDestroy, and sometimes the destructors of the message and/or | 
|  | // its contents rely on this thread still being set as the current thread. | 
|  | Stop(); | 
|  | DoDestroy(); | 
|  | ThreadManager::Instance()->SetCurrentThread(nullptr); | 
|  | ThreadManager::Instance()->SetCurrentThread(old_thread_); | 
|  | if (old_thread_) { | 
|  | ThreadManager::Add(old_thread_); | 
|  | } | 
|  | } | 
|  |  | 
|  | }  // namespace webrtc |