Use asynchronous posting for PeerConnection signaling tasks Update RunOnSignalingThread to use PostTask with thread safety flags instead of BlockingCall. The places where RunOnSignalingThread is used, bypass the proxy but used BlockingCall to remain functionally compatible with the proxies. The methods are however asynchronous, so the BlockingCall can be avoided. That was not done in the first step when RunOnSignalingThread was introduced in order to not bring too many changes (the main focus was on allowing non thread-hopping getters). This change is a follow up step that transitions the initial step of the task execution from a synchronous, blocking operation to an asynchronous when called from a different thread. As a btw change, SetLocalDescription and SetRemoteDescription now consistently manage the lifetime of their arguments using std::unique_ptr and scoped_refptr to ensure that SessionDescriptionInterface and SetSessionDescriptionObserver objects remain valid until the asynchronous task completes on the signaling thread, preventing potential use-after-free or memory leak issues. Bug: webrtc:442220720 Change-Id: I71cda2643c8ddd89622436df08bf889c0a35c56d Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/454980 Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/main@{#47095}
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc index 5781274..cd4ad8b 100644 --- a/pc/peer_connection.cc +++ b/pc/peer_connection.cc
@@ -1538,12 +1538,15 @@ void PeerConnection::SetLocalDescription( SetSessionDescriptionObserver* observer, SessionDescriptionInterface* desc) { - if (!CheckValidSetDescription(observer, desc, signaling_thread())) + auto desc_ptr = std::unique_ptr<SessionDescriptionInterface>(desc); + if (!CheckValidSetDescription(observer, desc_ptr, signaling_thread())) return; - RunOnSignalingThread([this, desc, observer]() mutable { + RunOnSignalingThread([this, desc = std::move(desc_ptr), + observer = scoped_refptr<SetSessionDescriptionObserver>( + observer)]() mutable { RTC_DCHECK_RUN_ON(signaling_thread()); - sdp_handler_->SetLocalDescription(observer, desc); + sdp_handler_->SetLocalDescription(std::move(observer), std::move(desc)); }); } @@ -1564,7 +1567,8 @@ void PeerConnection::SetLocalDescription( SetSessionDescriptionObserver* observer) { RTC_DCHECK_RUN_ON(signaling_thread()); - sdp_handler_->SetLocalDescription(observer); + sdp_handler_->SetLocalDescription( + scoped_refptr<SetSessionDescriptionObserver>(observer)); } void PeerConnection::SetLocalDescription( @@ -1576,12 +1580,15 @@ void PeerConnection::SetRemoteDescription( SetSessionDescriptionObserver* observer, SessionDescriptionInterface* desc) { - if (!CheckValidSetDescription(observer, desc, signaling_thread())) + auto desc_ptr = std::unique_ptr<SessionDescriptionInterface>(desc); + if (!CheckValidSetDescription(observer, desc_ptr, signaling_thread())) return; - RunOnSignalingThread([this, desc, observer]() mutable { + RunOnSignalingThread([this, desc = std::move(desc_ptr), + observer = scoped_refptr<SetSessionDescriptionObserver>( + observer)]() mutable { RTC_DCHECK_RUN_ON(signaling_thread()); - sdp_handler_->SetRemoteDescription(observer, desc); + sdp_handler_->SetRemoteDescription(std::move(observer), std::move(desc)); }); } @@ -3196,12 +3203,8 @@ if (signaling_thread()->IsCurrent()) { std::move(task)(); } else { - // Consider if we can use PostTask instead in some situations: - // signaling_thread()->PostTask( - // SafeTask(signaling_thread_safety_.flag(), std::move(task))); - // Currently we use BlockingCall() to be compatible with how the - // api proxies work by default. - signaling_thread()->BlockingCall([&] { std::move(task)(); }); + signaling_thread()->PostTask( + SafeTask(signaling_thread_safety_.flag(), std::move(task))); } }
diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc index 10f2f13..590a465 100644 --- a/pc/sdp_offer_answer.cc +++ b/pc/sdp_offer_answer.cc
@@ -1733,19 +1733,18 @@ } void SdpOfferAnswerHandler::SetLocalDescription( - SetSessionDescriptionObserver* observer, - SessionDescriptionInterface* desc_ptr) { + scoped_refptr<SetSessionDescriptionObserver> observer, + std::unique_ptr<SessionDescriptionInterface> desc) { RTC_LOG_THREAD_BLOCK_COUNT(); RTC_DCHECK_RUN_ON(signaling_thread()); - RTC_DCHECK(desc_ptr); + RTC_DCHECK(desc); RTC_DCHECK(observer); // Chain this operation. If asynchronous operations are pending on the chain, // this operation will be queued to be invoked, otherwise the contents of the // lambda will execute immediately. operations_chain_->ChainOperation( [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), - observer_refptr = scoped_refptr<SetSessionDescriptionObserver>(observer), - desc = std::unique_ptr<SessionDescriptionInterface>(desc_ptr)]( + observer_refptr = observer, desc = std::move(desc)]( std::function<void()> operations_chain_callback) mutable { // Abort early if `this_weak_ptr` is no longer valid. if (!this_weak_ptr) { @@ -1801,13 +1800,13 @@ } void SdpOfferAnswerHandler::SetLocalDescription( - SetSessionDescriptionObserver* observer) { + scoped_refptr<SetSessionDescriptionObserver> observer) { RTC_LOG_THREAD_BLOCK_COUNT(); RTC_DCHECK_RUN_ON(signaling_thread()); - RTC_DCHECK(observer); - SetLocalDescription(make_ref_counted<SetSessionDescriptionObserverAdapter>( - weak_ptr_factory_.GetWeakPtr(), - scoped_refptr<SetSessionDescriptionObserver>(observer))); + scoped_refptr<SetLocalDescriptionObserverInterface> adapter = + make_ref_counted<SetSessionDescriptionObserverAdapter>( + weak_ptr_factory_.GetWeakPtr(), std::move(observer)); + SetLocalDescription(std::move(adapter)); } void SdpOfferAnswerHandler::SetLocalDescription( @@ -2144,36 +2143,15 @@ } void SdpOfferAnswerHandler::SetRemoteDescription( - SetSessionDescriptionObserver* observer, - SessionDescriptionInterface* desc_ptr) { + scoped_refptr<SetSessionDescriptionObserver> observer, + std::unique_ptr<SessionDescriptionInterface> desc) { RTC_DCHECK_RUN_ON(signaling_thread()); RTC_DCHECK(observer); - RTC_DCHECK(desc_ptr); - // Chain this operation. If asynchronous operations are pending on the chain, - // this operation will be queued to be invoked, otherwise the contents of the - // lambda will execute immediately. - operations_chain_->ChainOperation( - [this_weak_ptr = weak_ptr_factory_.GetWeakPtr(), - observer_refptr = scoped_refptr<SetSessionDescriptionObserver>(observer), - desc = std::unique_ptr<SessionDescriptionInterface>(desc_ptr)]( - std::function<void()> operations_chain_callback) mutable { - // Abort early if `this_weak_ptr` is no longer valid. - if (!this_weak_ptr) { - // For consistency with SetSessionDescriptionObserverAdapter whose - // posted messages doesn't get processed when the PC is destroyed, we - // do not inform `observer_refptr` that the operation failed. - operations_chain_callback(); - return; - } - // SetSessionDescriptionObserverAdapter takes care of making sure the - // `observer_refptr` is invoked in a posted message. - this_weak_ptr->DoSetRemoteDescription( - std::make_unique<RemoteDescriptionOperation>( - this_weak_ptr.get(), std::move(desc), - make_ref_counted<SetSessionDescriptionObserverAdapter>( - this_weak_ptr, observer_refptr), - std::move(operations_chain_callback))); - }); + RTC_DCHECK(desc); + SetRemoteDescription( + std::move(desc), + make_ref_counted<SetSessionDescriptionObserverAdapter>( + weak_ptr_factory_.GetWeakPtr(), std::move(observer))); } void SdpOfferAnswerHandler::SetRemoteDescription(
diff --git a/pc/sdp_offer_answer.h b/pc/sdp_offer_answer.h index 05fca59..7be0f66 100644 --- a/pc/sdp_offer_answer.h +++ b/pc/sdp_offer_answer.h
@@ -148,15 +148,18 @@ scoped_refptr<SetLocalDescriptionObserverInterface> observer); void SetLocalDescription( scoped_refptr<SetLocalDescriptionObserverInterface> observer); - void SetLocalDescription(SetSessionDescriptionObserver* observer, - SessionDescriptionInterface* desc); - void SetLocalDescription(SetSessionDescriptionObserver* observer); + void SetLocalDescription( + scoped_refptr<SetSessionDescriptionObserver> observer, + std::unique_ptr<SessionDescriptionInterface> desc); + void SetLocalDescription( + scoped_refptr<SetSessionDescriptionObserver> observer); void SetRemoteDescription( std::unique_ptr<SessionDescriptionInterface> desc, scoped_refptr<SetRemoteDescriptionObserverInterface> observer); - void SetRemoteDescription(SetSessionDescriptionObserver* observer, - SessionDescriptionInterface* desc); + void SetRemoteDescription( + scoped_refptr<SetSessionDescriptionObserver> observer, + std::unique_ptr<SessionDescriptionInterface> desc); PeerConnectionInterface::RTCConfiguration GetConfiguration(); RTCError SetConfiguration(