Fix a problem in Thread::Send. Previously if thread A->Send is called on thread B, B->ReceiveSends will be called, which enables an arbitrary thread to invoke calls on B while B is wait for A->Send to return. This caused mutliple problems like issue 3559, 3579. The fix is to limit B->ReceiveSends to only process requests from A. Also disallow the worker thread invoking other threads. BUG=3559 R=juberti@webrtc.org Review URL: https://webrtc-codereview.appspot.com/15089004 git-svn-id: http://webrtc.googlecode.com/svn/trunk@7290 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/talk/app/webrtc/peerconnection_unittest.cc b/talk/app/webrtc/peerconnection_unittest.cc index 0d3e426..977fc11 100644 --- a/talk/app/webrtc/peerconnection_unittest.cc +++ b/talk/app/webrtc/peerconnection_unittest.cc
@@ -481,9 +481,8 @@ if (!allocator_factory_) { return false; } - audio_thread_.Start(); fake_audio_capture_module_ = FakeAudioCaptureModule::Create( - &audio_thread_); + rtc::Thread::Current()); if (fake_audio_capture_module_ == NULL) { return false; @@ -557,12 +556,6 @@ } std::string id_; - // Separate thread for executing |fake_audio_capture_module_| tasks. Audio - // processing must not be performed on the same thread as signaling due to - // signaling time constraints and relative complexity of the audio pipeline. - // This is consistent with the video pipeline that us a a separate thread for - // encoding and decoding. - rtc::Thread audio_thread_; rtc::scoped_refptr<webrtc::PortAllocatorFactoryInterface> allocator_factory_;
diff --git a/talk/app/webrtc/peerconnectionfactory.cc b/talk/app/webrtc/peerconnectionfactory.cc index 5dccba8..862ceda 100644 --- a/talk/app/webrtc/peerconnectionfactory.cc +++ b/talk/app/webrtc/peerconnectionfactory.cc
@@ -41,6 +41,7 @@ #include "talk/media/webrtc/webrtcmediaengine.h" #include "talk/media/webrtc/webrtcvideodecoderfactory.h" #include "talk/media/webrtc/webrtcvideoencoderfactory.h" +#include "webrtc/base/bind.h" #include "webrtc/modules/audio_device/include/audio_device.h" using rtc::scoped_refptr;
diff --git a/talk/app/webrtc/test/peerconnectiontestwrapper.cc b/talk/app/webrtc/test/peerconnectiontestwrapper.cc index 8a4f45c..24932b8 100644 --- a/talk/app/webrtc/test/peerconnectiontestwrapper.cc +++ b/talk/app/webrtc/test/peerconnectiontestwrapper.cc
@@ -75,9 +75,8 @@ return false; } - audio_thread_.Start(); fake_audio_capture_module_ = FakeAudioCaptureModule::Create( - &audio_thread_); + rtc::Thread::Current()); if (fake_audio_capture_module_ == NULL) { return false; }
diff --git a/talk/app/webrtc/test/peerconnectiontestwrapper.h b/talk/app/webrtc/test/peerconnectiontestwrapper.h index f3477ce..d4a0e4e 100644 --- a/talk/app/webrtc/test/peerconnectiontestwrapper.h +++ b/talk/app/webrtc/test/peerconnectiontestwrapper.h
@@ -111,7 +111,6 @@ bool video, const webrtc::FakeConstraints& video_constraints); std::string name_; - rtc::Thread audio_thread_; rtc::scoped_refptr<webrtc::PortAllocatorFactoryInterface> allocator_factory_; rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
diff --git a/talk/session/media/channelmanager.cc b/talk/session/media/channelmanager.cc index 45e7e47..199bc86 100644 --- a/talk/session/media/channelmanager.cc +++ b/talk/session/media/channelmanager.cc
@@ -137,6 +137,12 @@ this, &ChannelManager::OnVideoCaptureStateChange); capture_manager_->SignalCapturerStateChange.connect( this, &ChannelManager::OnVideoCaptureStateChange); + + if (worker_thread_ != rtc::Thread::Current()) { + // Do not allow invoking calls to other threads on the worker thread. + worker_thread_->Invoke<bool>( + rtc::Bind(&rtc::Thread::SetAllowBlockingCalls, worker_thread_, false)); + } } ChannelManager::~ChannelManager() {
diff --git a/webrtc/base/thread.cc b/webrtc/base/thread.cc index 9d2917d..40257ab 100644 --- a/webrtc/base/thread.cc +++ b/webrtc/base/thread.cc
@@ -411,15 +411,12 @@ } void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) { - AssertBlockingIsAllowedOnCurrentThread(); - if (fStop_) return; // Sent messages are sent to the MessageHandler directly, in the context // of "thread", like Win32 SendMessage. If in the right context, // call the handler directly. - Message msg; msg.phandler = phandler; msg.message_id = id; @@ -429,6 +426,8 @@ return; } + AssertBlockingIsAllowedOnCurrentThread(); + AutoThread thread; Thread *current_thread = Thread::Current(); ASSERT(current_thread != NULL); // AutoThread ensures this @@ -451,7 +450,9 @@ crit_.Enter(); while (!ready) { crit_.Leave(); - current_thread->ReceiveSends(); + // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary + // thread invoking calls on the current thread. + current_thread->ReceiveSendsFromThread(this); current_thread->socketserver()->Wait(kForever, false); waited = true; crit_.Enter(); @@ -475,17 +476,23 @@ } void Thread::ReceiveSends() { + ReceiveSendsFromThread(NULL); +} + +void Thread::ReceiveSendsFromThread(const Thread* source) { // Receive a sent message. Cleanup scenarios: // - thread sending exits: We don't allow this, since thread can exit // only via Join, so Send must complete. // - thread receiving exits: Wakeup/set ready in Thread::Clear() // - object target cleared: Wakeup/set ready in Thread::Clear() + _SendMessage smsg; + crit_.Enter(); - while (!sendlist_.empty()) { - _SendMessage smsg = sendlist_.front(); - sendlist_.pop_front(); + while (PopSendMessageFromThread(source, &smsg)) { crit_.Leave(); + smsg.msg.phandler->OnMessage(&smsg.msg); + crit_.Enter(); *smsg.ready = true; smsg.thread->socketserver()->WakeUp(); @@ -493,6 +500,18 @@ crit_.Leave(); } +bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) { + for (std::list<_SendMessage>::iterator it = sendlist_.begin(); + it != sendlist_.end(); ++it) { + if (it->thread == source || source == NULL) { + *msg = *it; + sendlist_.erase(it); + return true; + } + } + return false; +} + void Thread::Clear(MessageHandler *phandler, uint32 id, MessageList* removed) { CritScope cs(&crit_);
diff --git a/webrtc/base/thread.h b/webrtc/base/thread.h index 25b0f56..34ec45e 100644 --- a/webrtc/base/thread.h +++ b/webrtc/base/thread.h
@@ -165,7 +165,6 @@ // See ScopedDisallowBlockingCalls for details. template <class ReturnT, class FunctorT> ReturnT Invoke(const FunctorT& functor) { - AssertBlockingIsAllowedOnCurrentThread(); FunctorMessageHandler<ReturnT, FunctorT> handler(functor); Send(&handler); return handler.result(); @@ -210,6 +209,10 @@ // of whatever code is conditionally executing because of the return value! bool RunningForTest() { return running(); } + // Sets the per-thread allow-blocking-calls flag and returns the previous + // value. + bool SetAllowBlockingCalls(bool allow); + protected: // This method should be called when thread is created using non standard // method, like derived implementation of rtc::Thread and it can not be @@ -226,10 +229,6 @@ // Blocks the calling thread until this thread has terminated. void Join(); - // Sets the per-thread allow-blocking-calls flag and returns the previous - // value. - bool SetAllowBlockingCalls(bool allow); - static void AssertBlockingIsAllowedOnCurrentThread(); friend class ScopedDisallowBlockingCalls; @@ -248,6 +247,16 @@ // Return true if the thread was started and hasn't yet stopped. bool running() { return running_.Wait(0); } + // Processes received "Send" requests. If |source| is not NULL, only requests + // from |source| are processed, otherwise, all requests are processed. + void ReceiveSendsFromThread(const Thread* source); + + // If |source| is not NULL, pops the first "Send" message from |source| in + // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|. + // The caller must lock |crit_| before calling. + // Returns true if there is such a message. + bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg); + std::list<_SendMessage> sendlist_; std::string name_; ThreadPriority priority_;
diff --git a/webrtc/base/thread_unittest.cc b/webrtc/base/thread_unittest.cc index 4229df2..57b6df6 100644 --- a/webrtc/base/thread_unittest.cc +++ b/webrtc/base/thread_unittest.cc
@@ -276,6 +276,78 @@ thread.Invoke<void>(&LocalFuncs::Func2); } +// Verifies that two threads calling Invoke on each other at the same time does +// not deadlock. +TEST(ThreadTest, TwoThreadsInvokeNoDeadlock) { + AutoThread thread; + Thread* current_thread = Thread::Current(); + ASSERT_TRUE(current_thread != NULL); + + Thread other_thread; + other_thread.Start(); + + struct LocalFuncs { + static void Set(bool* out) { *out = true; } + static void InvokeSet(Thread* thread, bool* out) { + thread->Invoke<void>(Bind(&Set, out)); + } + }; + + bool called = false; + other_thread.Invoke<void>( + Bind(&LocalFuncs::InvokeSet, current_thread, &called)); + + EXPECT_TRUE(called); +} + +// Verifies that if thread A invokes a call on thread B and thread C is trying +// to invoke A at the same time, thread A does not handle C's invoke while +// invoking B. +TEST(ThreadTest, ThreeThreadsInvoke) { + AutoThread thread; + Thread* thread_a = Thread::Current(); + Thread thread_b, thread_c; + thread_b.Start(); + thread_c.Start(); + + struct LocalFuncs { + static void Set(bool* out) { *out = true; } + static void InvokeSet(Thread* thread, bool* out) { + thread->Invoke<void>(Bind(&Set, out)); + } + + // Set |out| true and call InvokeSet on |thread|. + static void SetAndInvokeSet(bool* out, Thread* thread, bool* out_inner) { + *out = true; + InvokeSet(thread, out_inner); + } + + // Asynchronously invoke SetAndInvokeSet on |thread1| and wait until + // |thread1| starts the call. + static void AsyncInvokeSetAndWait( + Thread* thread1, Thread* thread2, bool* out) { + bool async_invoked = false; + + AsyncInvoker invoker; + invoker.AsyncInvoke<void>( + thread1, Bind(&SetAndInvokeSet, &async_invoked, thread2, out)); + + EXPECT_TRUE_WAIT(async_invoked, 2000); + } + }; + + bool thread_a_called = false; + + // Start the sequence A --(invoke)--> B --(async invoke)--> C --(invoke)--> A. + // Thread B returns when C receives the call and C should be blocked until A + // starts to process messages. + thread_b.Invoke<void>(Bind(&LocalFuncs::AsyncInvokeSetAndWait, + &thread_c, thread_a, &thread_a_called)); + EXPECT_FALSE(thread_a_called); + + EXPECT_TRUE_WAIT(thread_a_called, 2000); +} + class AsyncInvokeTest : public testing::Test { public: void IntCallback(int value) {