Fix unsynchronized access to jsep_transports_by_name_.
Also removing need for lock for ice restart flag, fix call paths and
add information about how JsepTransportController's events could live
fully on the network thread and complexity around signaling thread
should be handled by PeerConnection (more details in webrtc:12427).
Bug: webrtc:12426, webrtc:12427
Change-Id: I9b1fae8acf16d90d9716054fc3c390700877a82a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205221
Reviewed-by: Niels Moller <nisse@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33159}
diff --git a/pc/jsep_transport.cc b/pc/jsep_transport.cc
index 787e9b6..2d7347b 100644
--- a/pc/jsep_transport.cc
+++ b/pc/jsep_transport.cc
@@ -232,13 +232,11 @@
local_description_.reset();
return error;
}
- {
- webrtc::MutexLock lock(&accessor_lock_);
- if (needs_ice_restart_ && ice_restarting) {
- needs_ice_restart_ = false;
- RTC_LOG(LS_VERBOSE) << "needs-ice-restart flag cleared for transport "
- << mid();
- }
+
+ if (needs_ice_restart_ && ice_restarting) {
+ needs_ice_restart_ = false;
+ RTC_LOG(LS_VERBOSE) << "needs-ice-restart flag cleared for transport "
+ << mid();
}
return webrtc::RTCError::OK();
@@ -341,7 +339,7 @@
}
void JsepTransport::SetNeedsIceRestartFlag() {
- webrtc::MutexLock lock(&accessor_lock_);
+ RTC_DCHECK_RUN_ON(network_thread_);
if (!needs_ice_restart_) {
needs_ice_restart_ = true;
RTC_LOG(LS_VERBOSE) << "needs-ice-restart flag set for transport " << mid();
diff --git a/pc/jsep_transport.h b/pc/jsep_transport.h
index 0260b93..2199f5e 100644
--- a/pc/jsep_transport.h
+++ b/pc/jsep_transport.h
@@ -141,16 +141,14 @@
// set, offers should generate new ufrags/passwords until an ICE restart
// occurs.
//
- // This and the below method can be called safely from any thread as long as
- // SetXTransportDescription is not in progress.
- // TODO(tommi): Investigate on which threads (network or signal?) we really
- // need to access the needs_ice_restart flag.
- void SetNeedsIceRestartFlag() RTC_LOCKS_EXCLUDED(accessor_lock_);
+ // This and |needs_ice_restart()| must be called on the network thread.
+ void SetNeedsIceRestartFlag();
+
// Returns true if the ICE restart flag above was set, and no ICE restart has
// occurred yet for this transport (by applying a local description with
// changed ufrag/password).
- bool needs_ice_restart() const RTC_LOCKS_EXCLUDED(accessor_lock_) {
- webrtc::MutexLock lock(&accessor_lock_);
+ bool needs_ice_restart() const {
+ RTC_DCHECK_RUN_ON(network_thread_);
return needs_ice_restart_;
}
@@ -335,7 +333,7 @@
mutable webrtc::Mutex accessor_lock_;
const std::string mid_;
// needs-ice-restart bit as described in JSEP.
- bool needs_ice_restart_ RTC_GUARDED_BY(accessor_lock_) = false;
+ bool needs_ice_restart_ RTC_GUARDED_BY(network_thread_) = false;
rtc::scoped_refptr<rtc::RTCCertificate> local_certificate_
RTC_GUARDED_BY(network_thread_);
std::unique_ptr<JsepTransportDescription> local_description_
diff --git a/pc/jsep_transport_controller.cc b/pc/jsep_transport_controller.cc
index 045c991..be04947 100644
--- a/pc/jsep_transport_controller.cc
+++ b/pc/jsep_transport_controller.cc
@@ -210,6 +210,7 @@
}
void JsepTransportController::SetNeedsIceRestartFlag() {
+ RTC_DCHECK_RUN_ON(network_thread_);
for (auto& kv : jsep_transports_by_name_) {
kv.second->SetNeedsIceRestartFlag();
}
@@ -217,6 +218,14 @@
bool JsepTransportController::NeedsIceRestart(
const std::string& transport_name) const {
+ if (!network_thread_->IsCurrent()) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
+ return network_thread_->Invoke<bool>(
+ RTC_FROM_HERE, [&] { return NeedsIceRestart(transport_name); });
+ }
+
+ RTC_DCHECK_RUN_ON(network_thread_);
+
const cricket::JsepTransport* transport =
GetJsepTransportByName(transport_name);
if (!transport) {
@@ -246,6 +255,8 @@
RTC_FROM_HERE, [&] { return SetLocalCertificate(certificate); });
}
+ RTC_DCHECK_RUN_ON(network_thread_);
+
// Can't change a certificate, or set a null certificate.
if (certificate_ || !certificate) {
return false;
@@ -273,6 +284,8 @@
RTC_FROM_HERE, [&] { return GetLocalCertificate(transport_name); });
}
+ RTC_DCHECK_RUN_ON(network_thread_);
+
const cricket::JsepTransport* t = GetJsepTransportByName(transport_name);
if (!t) {
return nullptr;
@@ -287,6 +300,7 @@
return network_thread_->Invoke<std::unique_ptr<rtc::SSLCertChain>>(
RTC_FROM_HERE, [&] { return GetRemoteSSLCertChain(transport_name); });
}
+ RTC_DCHECK_RUN_ON(network_thread_);
// Get the certificate from the RTP transport's DTLS handshake. Should be
// identical to the RTCP transport's, since they were given the same remote
@@ -324,6 +338,8 @@
});
}
+ RTC_DCHECK_RUN_ON(network_thread_);
+
// Verify each candidate before passing down to the transport layer.
RTCError error = VerifyCandidates(candidates);
if (!error.ok()) {
@@ -345,6 +361,8 @@
RTC_FROM_HERE, [&] { return RemoveRemoteCandidates(candidates); });
}
+ RTC_DCHECK_RUN_ON(network_thread_);
+
// Verify each candidate before passing down to the transport layer.
RTCError error = VerifyCandidates(candidates);
if (!error.ok()) {
@@ -392,6 +410,8 @@
RTC_FROM_HERE, [=] { return GetStats(transport_name, stats); });
}
+ RTC_DCHECK_RUN_ON(network_thread_);
+
cricket::JsepTransport* transport = GetJsepTransportByName(transport_name);
if (!transport) {
return false;
@@ -450,7 +470,7 @@
JsepTransportController::CreateDtlsTransport(
const cricket::ContentInfo& content_info,
cricket::IceTransportInternal* ice) {
- RTC_DCHECK(network_thread_->IsCurrent());
+ RTC_DCHECK_RUN_ON(network_thread_);
std::unique_ptr<cricket::DtlsTransportInternal> dtls;
@@ -504,7 +524,7 @@
const std::string& transport_name,
rtc::PacketTransportInternal* rtp_packet_transport,
rtc::PacketTransportInternal* rtcp_packet_transport) {
- RTC_DCHECK(network_thread_->IsCurrent());
+ RTC_DCHECK_RUN_ON(network_thread_);
auto unencrypted_rtp_transport =
std::make_unique<RtpTransport>(rtcp_packet_transport == nullptr);
unencrypted_rtp_transport->SetRtpPacketTransport(rtp_packet_transport);
@@ -519,7 +539,7 @@
const std::string& transport_name,
cricket::DtlsTransportInternal* rtp_dtls_transport,
cricket::DtlsTransportInternal* rtcp_dtls_transport) {
- RTC_DCHECK(network_thread_->IsCurrent());
+ RTC_DCHECK_RUN_ON(network_thread_);
auto srtp_transport =
std::make_unique<webrtc::SrtpTransport>(rtcp_dtls_transport == nullptr);
RTC_DCHECK(rtp_dtls_transport);
@@ -555,6 +575,7 @@
std::vector<cricket::DtlsTransportInternal*>
JsepTransportController::GetDtlsTransports() {
+ RTC_DCHECK_RUN_ON(network_thread_);
std::vector<cricket::DtlsTransportInternal*> dtls_transports;
for (auto it = jsep_transports_by_name_.begin();
it != jsep_transports_by_name_.end(); ++it) {
@@ -1066,8 +1087,6 @@
}
void JsepTransportController::DestroyAllJsepTransports_n() {
- RTC_DCHECK(network_thread_->IsCurrent());
-
for (const auto& jsep_transport : jsep_transports_by_name_) {
config_.transport_observer->OnTransportChanged(jsep_transport.first,
nullptr, nullptr, nullptr);
@@ -1077,10 +1096,9 @@
}
void JsepTransportController::SetIceRole_n(cricket::IceRole ice_role) {
- RTC_DCHECK(network_thread_->IsCurrent());
-
ice_role_ = ice_role;
- for (auto& dtls : GetDtlsTransports()) {
+ auto dtls_transports = GetDtlsTransports();
+ for (auto& dtls : dtls_transports) {
dtls->ice_transport()->SetIceRole(ice_role_);
}
}
@@ -1135,7 +1153,6 @@
void JsepTransportController::OnTransportWritableState_n(
rtc::PacketTransportInternal* transport) {
- RTC_DCHECK(network_thread_->IsCurrent());
RTC_LOG(LS_INFO) << " Transport " << transport->transport_name()
<< " writability changed to " << transport->writable()
<< ".";
@@ -1144,27 +1161,25 @@
void JsepTransportController::OnTransportReceivingState_n(
rtc::PacketTransportInternal* transport) {
- RTC_DCHECK(network_thread_->IsCurrent());
UpdateAggregateStates_n();
}
void JsepTransportController::OnTransportGatheringState_n(
cricket::IceTransportInternal* transport) {
- RTC_DCHECK(network_thread_->IsCurrent());
UpdateAggregateStates_n();
}
void JsepTransportController::OnTransportCandidateGathered_n(
cricket::IceTransportInternal* transport,
const cricket::Candidate& candidate) {
- RTC_DCHECK(network_thread_->IsCurrent());
-
// We should never signal peer-reflexive candidates.
if (candidate.type() == cricket::PRFLX_PORT_TYPE) {
RTC_NOTREACHED();
return;
}
std::string transport_name = transport->transport_name();
+ // TODO(bugs.webrtc.org/12427): See if we can get rid of this. We should be
+ // able to just call this directly here.
invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, signaling_thread_, [this, transport_name, candidate] {
signal_ice_candidates_gathered_.Send(
@@ -1175,8 +1190,6 @@
void JsepTransportController::OnTransportCandidateError_n(
cricket::IceTransportInternal* transport,
const cricket::IceCandidateErrorEvent& event) {
- RTC_DCHECK(network_thread_->IsCurrent());
-
invoker_.AsyncInvoke<void>(RTC_FROM_HERE, signaling_thread_, [this, event] {
signal_ice_candidate_error_.Send(event);
});
@@ -1197,7 +1210,6 @@
void JsepTransportController::OnTransportRoleConflict_n(
cricket::IceTransportInternal* transport) {
- RTC_DCHECK(network_thread_->IsCurrent());
// Note: since the role conflict is handled entirely on the network thread,
// we don't need to worry about role conflicts occurring on two ports at
// once. The first one encountered should immediately reverse the role.
@@ -1214,7 +1226,6 @@
void JsepTransportController::OnTransportStateChanged_n(
cricket::IceTransportInternal* transport) {
- RTC_DCHECK(network_thread_->IsCurrent());
RTC_LOG(LS_INFO) << transport->transport_name() << " Transport "
<< transport->component()
<< " state changed. Check if state is complete.";
@@ -1222,8 +1233,6 @@
}
void JsepTransportController::UpdateAggregateStates_n() {
- RTC_DCHECK(network_thread_->IsCurrent());
-
auto dtls_transports = GetDtlsTransports();
cricket::IceConnectionState new_connection_state =
cricket::kIceConnectionConnecting;
diff --git a/pc/jsep_transport_controller.h b/pc/jsep_transport_controller.h
index 3dab284..506a418 100644
--- a/pc/jsep_transport_controller.h
+++ b/pc/jsep_transport_controller.h
@@ -227,6 +227,8 @@
// F: void(const std::string&, const std::vector<cricket::Candidate>&)
template <typename F>
void SubscribeIceCandidateGathered(F&& callback) {
+ // TODO(bugs.webrtc.org/12427): Post this subscription to the network
+ // thread.
signal_ice_candidates_gathered_.AddReceiver(std::forward<F>(callback));
}
@@ -294,6 +296,7 @@
CallbackList<cricket::IceGatheringState> signal_ice_gathering_state_;
// [mid, candidates]
+ // TODO(bugs.webrtc.org/12427): Protect this with network_thread_.
CallbackList<const std::string&, const std::vector<cricket::Candidate>&>
signal_ice_candidates_gathered_;
@@ -366,9 +369,9 @@
// Get the JsepTransport without considering the BUNDLE group. Return nullptr
// if the JsepTransport is destroyed.
const cricket::JsepTransport* GetJsepTransportByName(
- const std::string& transport_name) const;
+ const std::string& transport_name) const RTC_RUN_ON(network_thread_);
cricket::JsepTransport* GetJsepTransportByName(
- const std::string& transport_name);
+ const std::string& transport_name) RTC_RUN_ON(network_thread_);
// Creates jsep transport. Noop if transport is already created.
// Transport is created either during SetLocalDescription (|local| == true) or
@@ -454,7 +457,7 @@
AsyncResolverFactory* const async_resolver_factory_ = nullptr;
std::map<std::string, std::unique_ptr<cricket::JsepTransport>>
- jsep_transports_by_name_;
+ jsep_transports_by_name_ RTC_GUARDED_BY(network_thread_);
// This keeps track of the mapping between media section
// (BaseChannel/SctpTransport) and the JsepTransport underneath.
std::map<std::string, cricket::JsepTransport*> mid_to_transport_;
diff --git a/pc/peer_connection.cc b/pc/peer_connection.cc
index c3ffa29..2cb43bf 100644
--- a/pc/peer_connection.cc
+++ b/pc/peer_connection.cc
@@ -528,6 +528,9 @@
// The port allocator lives on the network thread and should be initialized
// there.
+ // TODO(bugs.webrtc.org/12427): See if we can piggyback on this call and
+ // initialize all the |transport_controller_->Subscribe*| calls below on the
+ // network thread via this invoke.
const auto pa_result =
network_thread()->Invoke<InitializePortAllocatorResult>(
RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &configuration] {
@@ -620,6 +623,10 @@
// due to lack of unit tests which trigger these scenarios.
// TODO(bugs.webrtc.org/12160): Remove above comments.
// callbacks for signaling_thread.
+ // TODO(bugs.webrtc.org/12427): If we can't piggyback on the above network
+ // Invoke(), then perhaps we could post these subscription calls to the
+ // network thread so that the transport controller doesn't have to do the
+ // signaling/network handling internally and use AsyncInvoker.
transport_controller_->SubscribeIceConnectionState(
[this](cricket::IceConnectionState s) {
RTC_DCHECK_RUN_ON(signaling_thread());
@@ -1379,10 +1386,29 @@
const bool has_local_description = local_description() != nullptr;
- // In theory this shouldn't fail.
+ const bool needs_ice_restart =
+ modified_config.servers != configuration_.servers ||
+ NeedIceRestart(
+ configuration_.surface_ice_candidates_on_ice_transport_type_changed,
+ configuration_.type, modified_config.type) ||
+ modified_config.GetTurnPortPrunePolicy() !=
+ configuration_.GetTurnPortPrunePolicy();
+ cricket::IceConfig ice_config = ParseIceConfig(modified_config);
+
+ // Apply part of the configuration on the network thread. In theory this
+ // shouldn't fail.
if (!network_thread()->Invoke<bool>(
- RTC_FROM_HERE, [this, &stun_servers, &turn_servers, &modified_config,
- has_local_description] {
+ RTC_FROM_HERE,
+ [this, needs_ice_restart, &ice_config, &stun_servers, &turn_servers,
+ &modified_config, has_local_description] {
+ // As described in JSEP, calling setConfiguration with new ICE
+ // servers or candidate policy must set a "needs-ice-restart" bit so
+ // that the next offer triggers an ICE restart which will pick up
+ // the changes.
+ if (needs_ice_restart)
+ transport_controller_->SetNeedsIceRestartFlag();
+
+ transport_controller_->SetIceConfig(ice_config);
return ReconfigurePortAllocator_n(
stun_servers, turn_servers, modified_config.type,
modified_config.ice_candidate_pool_size,
@@ -1395,20 +1421,6 @@
"Failed to apply configuration to PortAllocator.");
}
- // As described in JSEP, calling setConfiguration with new ICE servers or
- // candidate policy must set a "needs-ice-restart" bit so that the next offer
- // triggers an ICE restart which will pick up the changes.
- if (modified_config.servers != configuration_.servers ||
- NeedIceRestart(
- configuration_.surface_ice_candidates_on_ice_transport_type_changed,
- configuration_.type, modified_config.type) ||
- modified_config.GetTurnPortPrunePolicy() !=
- configuration_.GetTurnPortPrunePolicy()) {
- transport_controller_->SetNeedsIceRestartFlag();
- }
-
- transport_controller_->SetIceConfig(ParseIceConfig(modified_config));
-
if (configuration_.active_reset_srtp_params !=
modified_config.active_reset_srtp_params) {
transport_controller_->SetActiveResetSrtpParams(
@@ -2155,6 +2167,8 @@
void PeerConnection::OnTransportControllerCandidatesGathered(
const std::string& transport_name,
const cricket::Candidates& candidates) {
+ // TODO(bugs.webrtc.org/12427): Expect this to come in on the network thread
+ // (not signaling as it currently does), handle appropriately.
int sdp_mline_index;
if (!GetLocalCandidateMediaIndex(transport_name, &sdp_mline_index)) {
RTC_LOG(LS_ERROR)
diff --git a/pc/webrtc_session_description_factory.cc b/pc/webrtc_session_description_factory.cc
index 2a9dc3f..348016d 100644
--- a/pc/webrtc_session_description_factory.cc
+++ b/pc/webrtc_session_description_factory.cc
@@ -194,7 +194,7 @@
}
WebRtcSessionDescriptionFactory::~WebRtcSessionDescriptionFactory() {
- RTC_DCHECK(signaling_thread_->IsCurrent());
+ RTC_DCHECK_RUN_ON(signaling_thread_);
// Fail any requests that were asked for before identity generation completed.
FailPendingRequests(kFailedDueToSessionShutdown);
@@ -222,6 +222,7 @@
CreateSessionDescriptionObserver* observer,
const PeerConnectionInterface::RTCOfferAnswerOptions& options,
const cricket::MediaSessionOptions& session_options) {
+ RTC_DCHECK_RUN_ON(signaling_thread_);
std::string error = "CreateOffer";
if (certificate_request_state_ == CERTIFICATE_FAILED) {
error += kFailedDueToIdentityFailed;
@@ -441,7 +442,7 @@
void WebRtcSessionDescriptionFactory::FailPendingRequests(
const std::string& reason) {
- RTC_DCHECK(signaling_thread_->IsCurrent());
+ RTC_DCHECK_RUN_ON(signaling_thread_);
while (!create_session_description_requests_.empty()) {
const CreateSessionDescriptionRequest& request =
create_session_description_requests_.front();
@@ -476,7 +477,7 @@
}
void WebRtcSessionDescriptionFactory::OnCertificateRequestFailed() {
- RTC_DCHECK(signaling_thread_->IsCurrent());
+ RTC_DCHECK_RUN_ON(signaling_thread_);
RTC_LOG(LS_ERROR) << "Asynchronous certificate generation request failed.";
certificate_request_state_ = CERTIFICATE_FAILED;