Reland "Fix unsynchronized access to mid_to_transport_ in JsepTransportController"
This reverts commit 6b143c1c0686519bc9d73223c1350cee286c8d78.
Reason for revert:
Relanding with updated expectations for SctpTransport::Information
based on TransceiverStateSurfacer in Chromium.
Original change's description:
> Revert "Fix unsynchronized access to mid_to_transport_ in JsepTransportController"
>
> This reverts commit 6cd405850467683cf10d05028ea0f644a68a91a4.
>
> Reason for revert: Breaks WebRTC Chromium FYI Bots
>
> First failure:
> https://ci.chromium.org/p/chromium/builders/webrtc.fyi/WebRTC%20Chromium%20FYI%20Android%20Tests%20%28dbg%29%20%28L%20Nexus5%29/1925
>
> Failed tests:
> WebRtcDataBrowserTest.CallWithSctpDataAndMedia
> WebRtcDataBrowserTest.CallWithSctpDataOnly
>
> Original change's description:
> > Fix unsynchronized access to mid_to_transport_ in JsepTransportController
> >
> > * Added several thread checks to JTC to help with programmer errors.
> > * Avoid a few Invokes() to the network thread here and there such
> > as for fetching sctp transport name for getStats(). The transport
> > name is now cached when it changes on the network thread.
> > * JsepTransportController instances now get deleted on the network
> > thread rather than on the signaling thread + issuing an Invoke()
> > in the dtor.
> > * Moved some thread hops from JTC over to PC which is where the problem
> > exists and also (imho) makes it easier to see where hops happen in
> > the PC code.
> > * The sctp transport is now started asynchronously when we push down the
> > media description.
> > * PeerConnection proxy calls GetSctpTransport directly on the network
> > thread instead of to the signaling thread + blocking on the network
> > thread.
> > * The above changes simplified things for webrtc::SctpTransport which
> > allowed for removing locking from that class and delete some code.
> >
> > Bug: webrtc:9987, webrtc:12445
> > Change-Id: Ic89a9426e314e1b93c81751d4f732f05fa448fbc
> > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205620
> > Commit-Queue: Tommi <tommi@webrtc.org>
> > Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> > Cr-Commit-Position: refs/heads/master@{#33191}
>
> TBR=tommi@webrtc.org,hta@webrtc.org
>
> Change-Id: I7b2913d5133807589461105cf07eff3e9bb7157e
> No-Presubmit: true
> No-Tree-Checks: true
> No-Try: true
> Bug: webrtc:9987
> Bug: webrtc:12445
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/206466
> Reviewed-by: Guido Urdaneta <guidou@webrtc.org>
> Commit-Queue: Guido Urdaneta <guidou@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#33204}
TBR=tommi@webrtc.org,hta@webrtc.org,guidou@webrtc.org
# Not skipping CQ checks because this is a reland.
Bug: webrtc:9987
Bug: webrtc:12445
Change-Id: Icb205cbac493ed3b881d71ea3af4fb9018701bf4
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/206560
Reviewed-by: Tommi <tommi@webrtc.org>
Reviewed-by: Guido Urdaneta <guidou@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33219}
diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc
index 542dae4..0ded1de 100644
--- a/pc/jsep_transport_controller.cc
+++ b/pc/jsep_transport_controller.cc
@@ -105,10 +105,8 @@
JsepTransportController::~JsepTransportController() {
// Channel destructors may try to send packets, so this needs to happen on
// the network thread.
- network_thread_->Invoke<void>(RTC_FROM_HERE, [this] {
- RTC_DCHECK_RUN_ON(network_thread_);
- DestroyAllJsepTransports_n();
- });
+ RTC_DCHECK_RUN_ON(network_thread_);
+ DestroyAllJsepTransports_n();
}
RTCError JsepTransportController::SetLocalDescription(
@@ -145,6 +143,7 @@
RtpTransportInternal* JsepTransportController::GetRtpTransport(
const std::string& mid) const {
+ RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) {
return nullptr;
@@ -154,6 +153,7 @@
DataChannelTransportInterface* JsepTransportController::GetDataChannelTransport(
const std::string& mid) const {
+ RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) {
return nullptr;
@@ -163,6 +163,7 @@
cricket::DtlsTransportInternal* JsepTransportController::GetDtlsTransport(
const std::string& mid) {
+ RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) {
return nullptr;
@@ -172,6 +173,7 @@
const cricket::DtlsTransportInternal*
JsepTransportController::GetRtcpDtlsTransport(const std::string& mid) const {
+ RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) {
return nullptr;
@@ -181,6 +183,7 @@
rtc::scoped_refptr<webrtc::DtlsTransport>
JsepTransportController::LookupDtlsTransportByMid(const std::string& mid) {
+ RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) {
return nullptr;
@@ -190,6 +193,7 @@
rtc::scoped_refptr<SctpTransport> JsepTransportController::GetSctpTransport(
const std::string& mid) const {
+ RTC_DCHECK_RUN_ON(network_thread_);
auto jsep_transport = GetJsepTransportForMid(mid);
if (!jsep_transport) {
return nullptr;
@@ -236,11 +240,16 @@
absl::optional<rtc::SSLRole> JsepTransportController::GetDtlsRole(
const std::string& mid) const {
+ // TODO(tommi): Remove this hop. Currently it's called from the signaling
+ // thread during negotiations, potentially multiple times.
+ // WebRtcSessionDescriptionFactory::InternalCreateAnswer is one example.
if (!network_thread_->IsCurrent()) {
return network_thread_->Invoke<absl::optional<rtc::SSLRole>>(
RTC_FROM_HERE, [&] { return GetDtlsRole(mid); });
}
+ RTC_DCHECK_RUN_ON(network_thread_);
+
const cricket::JsepTransport* t = GetJsepTransportForMid(mid);
if (!t) {
return absl::optional<rtc::SSLRole>();
@@ -846,24 +855,34 @@
bool JsepTransportController::SetTransportForMid(
const std::string& mid,
cricket::JsepTransport* jsep_transport) {
- RTC_DCHECK(jsep_transport);
- if (mid_to_transport_[mid] == jsep_transport) {
- return true;
- }
RTC_DCHECK_RUN_ON(network_thread_);
+ RTC_DCHECK(jsep_transport);
+
+ auto it = mid_to_transport_.find(mid);
+ if (it != mid_to_transport_.end() && it->second == jsep_transport)
+ return true;
+
pending_mids_.push_back(mid);
- mid_to_transport_[mid] = jsep_transport;
+
+ if (it == mid_to_transport_.end()) {
+ mid_to_transport_.insert(std::make_pair(mid, jsep_transport));
+ } else {
+ it->second = jsep_transport;
+ }
+
return config_.transport_observer->OnTransportChanged(
mid, jsep_transport->rtp_transport(), jsep_transport->RtpDtlsTransport(),
jsep_transport->data_channel_transport());
}
void JsepTransportController::RemoveTransportForMid(const std::string& mid) {
+ RTC_DCHECK_RUN_ON(network_thread_);
bool ret = config_.transport_observer->OnTransportChanged(mid, nullptr,
nullptr, nullptr);
// Calling OnTransportChanged with nullptr should always succeed, since it is
// only expected to fail when adding media to a transport (not removing).
RTC_DCHECK(ret);
+
mid_to_transport_.erase(mid);
}
diff --git a/pc/jsep_transport_controller.h b/pc/jsep_transport_controller.h
index 506a418..59d66a2 100644
--- a/pc/jsep_transport_controller.h
+++ b/pc/jsep_transport_controller.h
@@ -363,8 +363,9 @@
// transports are bundled on (In current implementation, it is the first
// content in the BUNDLE group).
const cricket::JsepTransport* GetJsepTransportForMid(
- const std::string& mid) const;
- cricket::JsepTransport* GetJsepTransportForMid(const std::string& mid);
+ const std::string& mid) const RTC_RUN_ON(network_thread_);
+ cricket::JsepTransport* GetJsepTransportForMid(const std::string& mid)
+ RTC_RUN_ON(network_thread_);
// Get the JsepTransport without considering the BUNDLE group. Return nullptr
// if the JsepTransport is destroyed.
@@ -460,7 +461,8 @@
jsep_transports_by_name_ RTC_GUARDED_BY(network_thread_);
// This keeps track of the mapping between media section
// (BaseChannel/SctpTransport) and the JsepTransport underneath.
- std::map<std::string, cricket::JsepTransport*> mid_to_transport_;
+ std::map<std::string, cricket::JsepTransport*> mid_to_transport_
+ RTC_GUARDED_BY(network_thread_);
// Keep track of mids that have been mapped to transports. Used for rollback.
std::vector<std::string> pending_mids_ RTC_GUARDED_BY(network_thread_);
// Aggregate states for Transports.
diff --git a/pc/jsep_transport_controller_unittest.cc b/pc/jsep_transport_controller_unittest.cc
index 5361f90..9efa205 100644
--- a/pc/jsep_transport_controller_unittest.cc
+++ b/pc/jsep_transport_controller_unittest.cc
@@ -904,6 +904,9 @@
EXPECT_EQ(2, candidates_signal_count_);
EXPECT_TRUE(!signaled_on_non_signaling_thread_);
+
+ network_thread_->Invoke<void>(RTC_FROM_HERE,
+ [&] { transport_controller_.reset(); });
}
// Test that if the TransportController was created with the
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index 406c44d..087cffc 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -489,12 +489,17 @@
sdp_handler_->ResetSessionDescFactory();
}
- transport_controller_.reset();
- // port_allocator_ lives on the network thread and should be destroyed there.
+ // port_allocator_ and transport_controller_ live on the network thread and
+ // should be destroyed there.
network_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(network_thread());
+ transport_controller_.reset();
port_allocator_.reset();
+ if (network_thread_safety_) {
+ network_thread_safety_->SetNotAlive();
+ network_thread_safety_ = nullptr;
+ }
});
// call_ and event_log_ must be destroyed on the worker thread.
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
@@ -527,13 +532,15 @@
}
// The port allocator lives on the network thread and should be initialized
- // there.
+ // there. Also set up the task safety flag for canceling pending tasks on
+ // the network thread when closing.
// TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and
// initialize all the |transport_controller_->Subscribe*| calls below on the
// network thread via this invoke.
const auto pa_result =
network_thread()->Invoke<InitializePortAllocatorResult>(
RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] {
+ network_thread_safety_ = PendingTaskSafetyFlag::Create();
return InitializePortAllocator_n(stun_servers, turn_servers,
configuration);
});
@@ -832,6 +839,16 @@
return AddTransceiver(track, RtpTransceiverInit());
}
+RtpTransportInternal* PeerConnection::GetRtpTransport(const std::string& mid) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ return network_thread()->Invoke<RtpTransportInternal*>(
+ RTC_FROM_HERE, [this, &mid] {
+ auto rtp_transport = transport_controller_->GetRtpTransport(mid);
+ RTC_DCHECK(rtp_transport);
+ return rtp_transport;
+ });
+}
+
RTCErrorOr<rtc::scoped_refptr<RtpTransceiverInterface>>
PeerConnection::AddTransceiver(
rtc::scoped_refptr<MediaStreamTrackInterface> track,
@@ -1588,11 +1605,11 @@
rtc::scoped_refptr<SctpTransportInterface> PeerConnection::GetSctpTransport()
const {
- RTC_DCHECK_RUN_ON(signaling_thread());
- if (!sctp_mid_s_) {
+ RTC_DCHECK_RUN_ON(network_thread());
+ if (!sctp_mid_n_)
return nullptr;
- }
- return transport_controller_->GetSctpTransport(*sctp_mid_s_);
+
+ return transport_controller_->GetSctpTransport(*sctp_mid_n_);
}
const SessionDescriptionInterface* PeerConnection::local_description() const {
@@ -1673,11 +1690,16 @@
// WebRTC session description factory, the session description factory would
// call the transport controller.
sdp_handler_->ResetSessionDescFactory();
- transport_controller_.reset();
rtp_manager_->Close();
- network_thread()->Invoke<void>(
- RTC_FROM_HERE, [this] { port_allocator_->DiscardCandidatePool(); });
+ network_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
+ transport_controller_.reset();
+ port_allocator_->DiscardCandidatePool();
+ if (network_thread_safety_) {
+ network_thread_safety_->SetNotAlive();
+ network_thread_safety_ = nullptr;
+ }
+ });
worker_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
RTC_DCHECK_RUN_ON(worker_thread());
@@ -1831,6 +1853,17 @@
}
}
+void PeerConnection::SetSctpDataMid(const std::string& mid) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ sctp_mid_s_ = mid;
+}
+
+void PeerConnection::ResetSctpDataMid() {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ sctp_mid_s_.reset();
+ sctp_transport_name_s_.clear();
+}
+
void PeerConnection::OnSctpDataChannelClosed(DataChannelInterface* channel) {
// Since data_channel_controller doesn't do signals, this
// signal is relayed here.
@@ -2044,13 +2077,8 @@
absl::optional<std::string> PeerConnection::sctp_transport_name() const {
RTC_DCHECK_RUN_ON(signaling_thread());
- if (sctp_mid_s_ && transport_controller_) {
- auto dtls_transport = transport_controller_->GetDtlsTransport(*sctp_mid_s_);
- if (dtls_transport) {
- return dtls_transport->transport_name();
- }
- return absl::optional<std::string>();
- }
+ if (sctp_mid_s_ && transport_controller_)
+ return sctp_transport_name_s_;
return absl::optional<std::string>();
}
@@ -2289,6 +2317,15 @@
data_channel_controller_.set_data_channel_transport(transport);
data_channel_controller_.SetupDataChannelTransport_n();
sctp_mid_n_ = mid;
+ auto dtls_transport = transport_controller_->GetDtlsTransport(mid);
+ if (dtls_transport) {
+ signaling_thread()->PostTask(
+ ToQueuedTask(signaling_thread_safety_.flag(),
+ [this, name = dtls_transport->transport_name()] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ sctp_transport_name_s_ = std::move(name);
+ }));
+ }
// Note: setting the data sink and checking initial state must be done last,
// after setting up the data channel. Setting the data sink may trigger
@@ -2633,9 +2670,19 @@
if (base_channel) {
ret = base_channel->SetRtpTransport(rtp_transport);
}
+
if (mid == sctp_mid_n_) {
data_channel_controller_.OnTransportChanged(data_channel_transport);
+ if (dtls_transport) {
+ signaling_thread()->PostTask(ToQueuedTask(
+ signaling_thread_safety_.flag(),
+ [this, name = dtls_transport->internal()->transport_name()] {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ sctp_transport_name_s_ = std::move(name);
+ }));
+ }
}
+
return ret;
}
@@ -2645,6 +2692,23 @@
return observer_;
}
+void PeerConnection::StartSctpTransport(int local_port,
+ int remote_port,
+ int max_message_size) {
+ RTC_DCHECK_RUN_ON(signaling_thread());
+ if (!sctp_mid_s_)
+ return;
+
+ network_thread()->PostTask(ToQueuedTask(
+ network_thread_safety_,
+ [this, mid = *sctp_mid_s_, local_port, remote_port, max_message_size] {
+ rtc::scoped_refptr<SctpTransport> sctp_transport =
+ transport_controller()->GetSctpTransport(mid);
+ if (sctp_transport)
+ sctp_transport->Start(local_port, remote_port, max_message_size);
+ }));
+}
+
CryptoOptions PeerConnection::GetCryptoOptions() {
RTC_DCHECK_RUN_ON(signaling_thread());
// TODO(bugs.webrtc.org/9891) - Remove PeerConnectionFactory::CryptoOptions
diff --git a/pc/peer_connection.h b/pc/peer_connection.h
index 4bab90a..92e33d2 100644
--- a/pc/peer_connection.h
+++ b/pc/peer_connection.h
@@ -404,14 +404,15 @@
// channels are configured this will return nullopt.
absl::optional<std::string> GetDataMid() const;
- void SetSctpDataMid(const std::string& mid) {
- RTC_DCHECK_RUN_ON(signaling_thread());
- sctp_mid_s_ = mid;
- }
- void ResetSctpDataMid() {
- RTC_DCHECK_RUN_ON(signaling_thread());
- sctp_mid_s_.reset();
- }
+ void SetSctpDataMid(const std::string& mid);
+
+ void ResetSctpDataMid();
+
+ // Asynchronously calls SctpTransport::Start() on the network thread for
+ // |sctp_mid()| if set. Called as part of setting the local description.
+ void StartSctpTransport(int local_port,
+ int remote_port,
+ int max_message_size);
// Returns the CryptoOptions for this PeerConnection. This will always
// return the RTCConfiguration.crypto_options if set and will only default
@@ -427,12 +428,7 @@
bool fire_callback = true);
// Returns rtp transport, result can not be nullptr.
- RtpTransportInternal* GetRtpTransport(const std::string& mid) {
- RTC_DCHECK_RUN_ON(signaling_thread());
- auto rtp_transport = transport_controller_->GetRtpTransport(mid);
- RTC_DCHECK(rtp_transport);
- return rtp_transport;
- }
+ RtpTransportInternal* GetRtpTransport(const std::string& mid);
// Returns true if SRTP (either using DTLS-SRTP or SDES) is required by
// this session.
@@ -648,6 +644,8 @@
// The unique_ptr belongs to the worker thread, but the Call object manages
// its own thread safety.
std::unique_ptr<Call> call_ RTC_GUARDED_BY(worker_thread());
+ ScopedTaskSafety signaling_thread_safety_;
+ rtc::scoped_refptr<PendingTaskSafetyFlag> network_thread_safety_;
std::unique_ptr<ScopedTaskSafety> call_safety_
RTC_GUARDED_BY(worker_thread());
@@ -677,6 +675,7 @@
// thread, but applied first on the networking thread via an invoke().
absl::optional<std::string> sctp_mid_s_ RTC_GUARDED_BY(signaling_thread());
absl::optional<std::string> sctp_mid_n_ RTC_GUARDED_BY(network_thread());
+ std::string sctp_transport_name_s_ RTC_GUARDED_BY(signaling_thread());
// The machinery for handling offers and answers. Const after initialization.
std::unique_ptr<SdpOfferAnswerHandler> sdp_handler_
diff --git a/pc/peer_connection_factory.cc b/pc/peer_connection_factory.cc
index c65b2f5..a8d64fa 100644
--- a/pc/peer_connection_factory.cc
+++ b/pc/peer_connection_factory.cc
@@ -265,8 +265,15 @@
if (!result.ok()) {
return result.MoveError();
}
+ // We configure the proxy with a pointer to the network thread for methods
+ // that need to be invoked there rather than on the signaling thread.
+ // Internally, the proxy object has a member variable named |worker_thread_|
+ // which will point to the network thread (and not the factory's
+ // worker_thread()). All such methods have thread checks though, so the code
+ // should still be clear (outside of macro expansion).
rtc::scoped_refptr<PeerConnectionInterface> result_proxy =
- PeerConnectionProxy::Create(signaling_thread(), result.MoveValue());
+ PeerConnectionProxy::Create(signaling_thread(), network_thread(),
+ result.MoveValue());
return result_proxy;
}
diff --git a/pc/peer_connection_integrationtest.cc b/pc/peer_connection_integrationtest.cc
index 4a25619..4ed92ad 100644
--- a/pc/peer_connection_integrationtest.cc
+++ b/pc/peer_connection_integrationtest.cc
@@ -5969,9 +5969,11 @@
callee()->AddAudioVideoTracks();
caller()->CreateAndSetAndSignalOffer();
ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout);
- ASSERT_EQ_WAIT(SctpTransportState::kConnected,
- caller()->pc()->GetSctpTransport()->Information().state(),
- kDefaultTimeout);
+ network_thread()->Invoke<void>(RTC_FROM_HERE, [this] {
+ ASSERT_EQ_WAIT(SctpTransportState::kConnected,
+ caller()->pc()->GetSctpTransport()->Information().state(),
+ kDefaultTimeout);
+ });
ASSERT_TRUE_WAIT(callee()->data_channel(), kDefaultTimeout);
ASSERT_TRUE_WAIT(callee()->data_observer()->IsOpen(), kDefaultTimeout);
}
diff --git a/pc/sctp_transport.cc b/pc/sctp_transport.cc
index f3e40b8..bb579af 100644
--- a/pc/sctp_transport.cc
+++ b/pc/sctp_transport.cc
@@ -45,7 +45,15 @@
}
SctpTransportInformation SctpTransport::Information() const {
- MutexLock lock(&lock_);
+ // TODO(tommi): Update PeerConnection::GetSctpTransport to hand out a proxy
+ // to the transport so that we can be sure that methods get called on the
+ // expected thread. Chromium currently calls this method from
+ // TransceiverStateSurfacer.
+ if (!owner_thread_->IsCurrent()) {
+ return owner_thread_->Invoke<SctpTransportInformation>(
+ RTC_FROM_HERE, [this] { return Information(); });
+ }
+ RTC_DCHECK_RUN_ON(owner_thread_);
return info_;
}
@@ -71,103 +79,78 @@
void SctpTransport::Clear() {
RTC_DCHECK_RUN_ON(owner_thread_);
RTC_DCHECK(internal());
- {
- MutexLock lock(&lock_);
- // Note that we delete internal_sctp_transport_, but
- // only drop the reference to dtls_transport_.
- dtls_transport_ = nullptr;
- internal_sctp_transport_ = nullptr;
- }
+ // Note that we delete internal_sctp_transport_, but
+ // only drop the reference to dtls_transport_.
+ dtls_transport_ = nullptr;
+ internal_sctp_transport_ = nullptr;
UpdateInformation(SctpTransportState::kClosed);
}
void SctpTransport::SetDtlsTransport(
rtc::scoped_refptr<DtlsTransport> transport) {
RTC_DCHECK_RUN_ON(owner_thread_);
- SctpTransportState next_state;
- {
- MutexLock lock(&lock_);
- next_state = info_.state();
- dtls_transport_ = transport;
- if (internal_sctp_transport_) {
- if (transport) {
- internal_sctp_transport_->SetDtlsTransport(transport->internal());
- transport->internal()->SignalDtlsState.connect(
- this, &SctpTransport::OnDtlsStateChange);
- if (info_.state() == SctpTransportState::kNew) {
- next_state = SctpTransportState::kConnecting;
- }
- } else {
- internal_sctp_transport_->SetDtlsTransport(nullptr);
+ SctpTransportState next_state = info_.state();
+ dtls_transport_ = transport;
+ if (internal_sctp_transport_) {
+ if (transport) {
+ internal_sctp_transport_->SetDtlsTransport(transport->internal());
+ transport->internal()->SignalDtlsState.connect(
+ this, &SctpTransport::OnDtlsStateChange);
+ if (info_.state() == SctpTransportState::kNew) {
+ next_state = SctpTransportState::kConnecting;
}
+ } else {
+ internal_sctp_transport_->SetDtlsTransport(nullptr);
}
}
+
UpdateInformation(next_state);
}
void SctpTransport::Start(int local_port,
int remote_port,
int max_message_size) {
- {
- MutexLock lock(&lock_);
- // Record max message size on calling thread.
- info_ = SctpTransportInformation(info_.state(), info_.dtls_transport(),
- max_message_size, info_.MaxChannels());
- }
- if (owner_thread_->IsCurrent()) {
- if (!internal()->Start(local_port, remote_port, max_message_size)) {
- RTC_LOG(LS_ERROR) << "Failed to push down SCTP parameters, closing.";
- UpdateInformation(SctpTransportState::kClosed);
- }
- } else {
- owner_thread_->Invoke<void>(
- RTC_FROM_HERE, [this, local_port, remote_port, max_message_size] {
- Start(local_port, remote_port, max_message_size);
- });
+ RTC_DCHECK_RUN_ON(owner_thread_);
+ info_ = SctpTransportInformation(info_.state(), info_.dtls_transport(),
+ max_message_size, info_.MaxChannels());
+
+ if (!internal()->Start(local_port, remote_port, max_message_size)) {
+ RTC_LOG(LS_ERROR) << "Failed to push down SCTP parameters, closing.";
+ UpdateInformation(SctpTransportState::kClosed);
}
}
void SctpTransport::UpdateInformation(SctpTransportState state) {
RTC_DCHECK_RUN_ON(owner_thread_);
- bool must_send_update;
- SctpTransportInformation info_copy(SctpTransportState::kNew);
- {
- MutexLock lock(&lock_);
- must_send_update = (state != info_.state());
- // TODO(https://bugs.webrtc.org/10358): Update max channels from internal
- // SCTP transport when available.
- if (internal_sctp_transport_) {
- info_ = SctpTransportInformation(
- state, dtls_transport_, info_.MaxMessageSize(), info_.MaxChannels());
- } else {
- info_ = SctpTransportInformation(
- state, dtls_transport_, info_.MaxMessageSize(), info_.MaxChannels());
- }
- if (observer_ && must_send_update) {
- info_copy = info_;
- }
+ bool must_send_update = (state != info_.state());
+ // TODO(https://bugs.webrtc.org/10358): Update max channels from internal
+ // SCTP transport when available.
+ if (internal_sctp_transport_) {
+ info_ = SctpTransportInformation(
+ state, dtls_transport_, info_.MaxMessageSize(), info_.MaxChannels());
+ } else {
+ info_ = SctpTransportInformation(
+ state, dtls_transport_, info_.MaxMessageSize(), info_.MaxChannels());
}
- // We call the observer without holding the lock.
+
if (observer_ && must_send_update) {
- observer_->OnStateChange(info_copy);
+ observer_->OnStateChange(info_);
}
}
void SctpTransport::OnAssociationChangeCommunicationUp() {
RTC_DCHECK_RUN_ON(owner_thread_);
- {
- MutexLock lock(&lock_);
- RTC_DCHECK(internal_sctp_transport_);
- if (internal_sctp_transport_->max_outbound_streams() &&
- internal_sctp_transport_->max_inbound_streams()) {
- int max_channels =
- std::min(*(internal_sctp_transport_->max_outbound_streams()),
- *(internal_sctp_transport_->max_inbound_streams()));
- // Record max channels.
- info_ = SctpTransportInformation(info_.state(), info_.dtls_transport(),
- info_.MaxMessageSize(), max_channels);
- }
+ RTC_DCHECK(internal_sctp_transport_);
+ if (internal_sctp_transport_->max_outbound_streams() &&
+ internal_sctp_transport_->max_inbound_streams()) {
+ int max_channels =
+ std::min(*(internal_sctp_transport_->max_outbound_streams()),
+ *(internal_sctp_transport_->max_inbound_streams()));
+ // Record max channels.
+ info_ = SctpTransportInformation(info_.state(), info_.dtls_transport(),
+ info_.MaxMessageSize(), max_channels);
}
+
UpdateInformation(SctpTransportState::kConnected);
}
diff --git a/pc/sctp_transport.h b/pc/sctp_transport.h
index d916a00..4bb4274 100644
--- a/pc/sctp_transport.h
+++ b/pc/sctp_transport.h
@@ -20,7 +20,6 @@
#include "media/sctp/sctp_transport_internal.h"
#include "p2p/base/dtls_transport_internal.h"
#include "pc/dtls_transport.h"
-#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/third_party/sigslot/sigslot.h"
#include "rtc_base/thread.h"
#include "rtc_base/thread_annotations.h"
@@ -54,12 +53,12 @@
// internal() to be functions on the webrtc::SctpTransport interface,
// and make the internal() function private.
cricket::SctpTransportInternal* internal() {
- MutexLock lock(&lock_);
+ RTC_DCHECK_RUN_ON(owner_thread_);
return internal_sctp_transport_.get();
}
const cricket::SctpTransportInternal* internal() const {
- MutexLock lock(&lock_);
+ RTC_DCHECK_RUN_ON(owner_thread_);
return internal_sctp_transport_.get();
}
@@ -75,15 +74,12 @@
void OnDtlsStateChange(cricket::DtlsTransportInternal* transport,
cricket::DtlsTransportState state);
- // Note - owner_thread never changes, but can't be const if we do
- // Invoke() on it.
- rtc::Thread* owner_thread_;
- mutable Mutex lock_;
- // Variables accessible off-thread, guarded by lock_
- SctpTransportInformation info_ RTC_GUARDED_BY(lock_);
+ // NOTE: |owner_thread_| is the thread that the SctpTransport object is
+ // constructed on. In the context of PeerConnection, it's the network thread.
+ rtc::Thread* const owner_thread_;
+ SctpTransportInformation info_ RTC_GUARDED_BY(owner_thread_);
std::unique_ptr<cricket::SctpTransportInternal> internal_sctp_transport_
- RTC_GUARDED_BY(lock_);
- // Variables only accessed on-thread
+ RTC_GUARDED_BY(owner_thread_);
SctpTransportObserverInterface* observer_ RTC_GUARDED_BY(owner_thread_) =
nullptr;
rtc::scoped_refptr<DtlsTransport> dtls_transport_
diff --git a/pc/sdp_offer_answer.cc b/pc/sdp_offer_answer.cc
index 4dd5b6f..9fa4188 100644
--- a/pc/sdp_offer_answer.cc
+++ b/pc/sdp_offer_answer.cc
@@ -729,6 +729,21 @@
return true;
}
+rtc::scoped_refptr<webrtc::DtlsTransport> LookupDtlsTransportByMid(
+ rtc::Thread* network_thread,
+ JsepTransportController* controller,
+ const std::string& mid) {
+ // TODO(tommi): Can we post this (and associated operations where this
+ // function is called) to the network thread and avoid this Invoke?
+ // We might be able to simplify a few things if we set the transport on
+ // the network thread and then update the implementation to check that
+ // the set_ and relevant get methods are always called on the network
+ // thread (we'll need to update proxy maps).
+ return network_thread->Invoke<rtc::scoped_refptr<webrtc::DtlsTransport>>(
+ RTC_FROM_HERE,
+ [controller, &mid] { return controller->LookupDtlsTransportByMid(mid); });
+}
+
} // namespace
// Used by parameterless SetLocalDescription() to create an offer or answer.
@@ -1308,8 +1323,8 @@
// Note that code paths that don't set MID won't be able to use
// information about DTLS transports.
if (transceiver->mid()) {
- auto dtls_transport = transport_controller()->LookupDtlsTransportByMid(
- *transceiver->mid());
+ auto dtls_transport = LookupDtlsTransportByMid(
+ pc_->network_thread(), transport_controller(), *transceiver->mid());
transceiver->internal()->sender_internal()->set_transport(
dtls_transport);
transceiver->internal()->receiver_internal()->set_transport(
@@ -1725,9 +1740,9 @@
transceiver->internal()->set_current_direction(local_direction);
// 2.2.8.1.11.[3-6]: Set the transport internal slots.
if (transceiver->mid()) {
- auto dtls_transport =
- transport_controller()->LookupDtlsTransportByMid(
- *transceiver->mid());
+ auto dtls_transport = LookupDtlsTransportByMid(pc_->network_thread(),
+ transport_controller(),
+ *transceiver->mid());
transceiver->internal()->sender_internal()->set_transport(
dtls_transport);
transceiver->internal()->receiver_internal()->set_transport(
@@ -4276,13 +4291,11 @@
// Need complete offer/answer with an SCTP m= section before starting SCTP,
// according to https://tools.ietf.org/html/draft-ietf-mmusic-sctp-sdp-19
if (pc_->sctp_mid() && local_description() && remote_description()) {
- rtc::scoped_refptr<SctpTransport> sctp_transport =
- transport_controller()->GetSctpTransport(*(pc_->sctp_mid()));
auto local_sctp_description = cricket::GetFirstSctpDataContentDescription(
local_description()->description());
auto remote_sctp_description = cricket::GetFirstSctpDataContentDescription(
remote_description()->description());
- if (sctp_transport && local_sctp_description && remote_sctp_description) {
+ if (local_sctp_description && remote_sctp_description) {
int max_message_size;
// A remote max message size of zero means "any size supported".
// We configure the connection with our own max message size.
@@ -4293,8 +4306,9 @@
std::min(local_sctp_description->max_message_size(),
remote_sctp_description->max_message_size());
}
- sctp_transport->Start(local_sctp_description->port(),
- remote_sctp_description->port(), max_message_size);
+ pc_->StartSctpTransport(local_sctp_description->port(),
+ remote_sctp_description->port(),
+ max_message_size);
}
}
@@ -4520,8 +4534,16 @@
return false;
}
- std::string transport_name = GetTransportName(result.value()->name);
- return !transport_name.empty();
+ bool has_transport = false;
+ cricket::ChannelInterface* channel = pc_->GetChannel(result.value()->name);
+ if (channel) {
+ has_transport = !channel->transport_name().empty();
+ } else if (data_channel_controller()->data_channel_transport()) {
+ auto sctp_mid = pc_->sctp_mid();
+ RTC_DCHECK(sctp_mid);
+ has_transport = (result.value()->name == *sctp_mid);
+ }
+ return has_transport;
}
void SdpOfferAnswerHandler::ReportRemoteIceCandidateAdded(
@@ -4644,6 +4666,7 @@
cricket::VideoChannel* SdpOfferAnswerHandler::CreateVideoChannel(
const std::string& mid) {
RTC_DCHECK_RUN_ON(signaling_thread());
+ // NOTE: This involves a non-ideal hop (Invoke) over to the network thread.
RtpTransportInternal* rtp_transport = pc_->GetRtpTransport(mid);
// TODO(bugs.webrtc.org/11992): CreateVideoChannel internally switches to the