Thinning out the Transport class.
Connecting TransportChannelImpls directly to the TransportController,
and removing redundant signal forwarding/state aggregating code from
Transport. This brings us closer to just getting rid of Transport
entirely.
R=pthatcher@webrtc.org
Review URL: https://codereview.webrtc.org/1380563002 .
Cr-Original-Commit-Position: refs/heads/master@{#10120}
Cr-Mirrored-From: https://chromium.googlesource.com/external/webrtc
Cr-Mirrored-Commit: c4d3a5d44c25fb42c26393b6ddc0feadd52e5e2f
diff --git a/p2p/base/dtlstransportchannel_unittest.cc b/p2p/base/dtlstransportchannel_unittest.cc
index 95696e2..e62563e 100644
--- a/p2p/base/dtlstransportchannel_unittest.cc
+++ b/p2p/base/dtlstransportchannel_unittest.cc
@@ -75,8 +75,6 @@
transport_->SetIceRole(role);
transport_->SetIceTiebreaker(
(role == cricket::ICEROLE_CONTROLLING) ? 1 : 2);
- transport_->SignalWritableState.connect(this,
- &DtlsTestClient::OnTransportWritableState);
for (int i = 0; i < count; ++i) {
cricket::DtlsTransportChannelWrapper* channel =
@@ -193,7 +191,15 @@
}
bool all_channels_writable() const {
- return transport_->all_channels_writable();
+ if (channels_.empty()) {
+ return false;
+ }
+ for (cricket::DtlsTransportChannelWrapper* channel : channels_) {
+ if (!channel->writable()) {
+ return false;
+ }
+ }
+ return true;
}
void CheckRole(rtc::SSLRole role) {
@@ -313,11 +319,6 @@
return (num_matches < ((static_cast<int>(size) - 5) / 10));
}
- // Transport callbacks
- void OnTransportWritableState(cricket::Transport* transport) {
- LOG(LS_INFO) << name_ << ": is writable";
- }
-
// Transport channel callbacks
void OnTransportChannelWritableState(cricket::TransportChannel* channel) {
LOG(LS_INFO) << name_ << ": Channel '" << channel->component()
diff --git a/p2p/base/transport.cc b/p2p/base/transport.cc
index 3e5f1b9..66ba63e 100644
--- a/p2p/base/transport.cc
+++ b/p2p/base/transport.cc
@@ -17,7 +17,7 @@
#include "webrtc/p2p/base/port.h"
#include "webrtc/p2p/base/transportchannelimpl.h"
#include "webrtc/base/bind.h"
-#include "webrtc/base/common.h"
+#include "webrtc/base/checks.h"
#include "webrtc/base/logging.h"
namespace cricket {
@@ -69,59 +69,22 @@
: name_(name), allocator_(allocator) {}
Transport::~Transport() {
- ASSERT(channels_destroyed_);
-}
-
-bool Transport::AllChannelsCompleted() const {
- // We aren't completed until at least one channel is complete, so if there
- // are no channels, we aren't complete yet.
- if (channels_.empty()) {
- LOG(LS_INFO) << name() << " transport is not complete"
- << " because it has no TransportChannels";
- return false;
- }
-
- // A Transport's ICE process is completed if all of its channels are writable,
- // have finished allocating candidates, and have pruned all but one of their
- // connections.
- for (const auto& iter : channels_) {
- const TransportChannelImpl* channel = iter.second.get();
- bool complete =
- channel->writable() &&
- channel->GetState() == TransportChannelState::STATE_COMPLETED &&
- channel->GetIceRole() == ICEROLE_CONTROLLING &&
- channel->gathering_state() == kIceGatheringComplete;
- if (!complete) {
- LOG(LS_INFO) << name() << " transport is not complete"
- << " because a channel is still incomplete.";
- return false;
- }
- }
-
- return true;
-}
-
-bool Transport::AnyChannelFailed() const {
- for (const auto& iter : channels_) {
- if (iter.second->GetState() == TransportChannelState::STATE_FAILED) {
- return true;
- }
- }
- return false;
+ RTC_DCHECK(channels_destroyed_);
}
void Transport::SetIceRole(IceRole role) {
ice_role_ = role;
- for (auto& iter : channels_) {
- iter.second->SetIceRole(ice_role_);
+ for (const auto& kv : channels_) {
+ kv.second->SetIceRole(ice_role_);
}
}
bool Transport::GetRemoteSSLCertificate(rtc::SSLCertificate** cert) {
- if (channels_.empty())
+ if (channels_.empty()) {
return false;
+ }
- ChannelMap::iterator iter = channels_.begin();
+ auto iter = channels_.begin();
return iter->second->GetRemoteSSLCertificate(cert);
}
@@ -155,8 +118,8 @@
local_description_.reset(new TransportDescription(description));
- for (auto& iter : channels_) {
- ret &= ApplyLocalTransportDescription(iter.second.get(), error_desc);
+ for (const auto& kv : channels_) {
+ ret &= ApplyLocalTransportDescription(kv.second, error_desc);
}
if (!ret) {
return false;
@@ -186,8 +149,8 @@
}
remote_description_.reset(new TransportDescription(description));
- for (auto& iter : channels_) {
- ret &= ApplyRemoteTransportDescription(iter.second.get(), error_desc);
+ for (const auto& kv : channels_) {
+ ret &= ApplyRemoteTransportDescription(kv.second, error_desc);
}
// If PRANSWER/ANSWER is set, we should decide transport protocol type.
@@ -202,67 +165,48 @@
}
TransportChannelImpl* Transport::CreateChannel(int component) {
- TransportChannelImpl* impl;
+ TransportChannelImpl* channel;
// Create the entry if it does not exist.
- bool impl_exists = false;
- auto iterator = channels_.find(component);
- if (iterator == channels_.end()) {
- impl = CreateTransportChannel(component);
- iterator = channels_.insert(std::pair<int, ChannelMapEntry>(
- component, ChannelMapEntry(impl))).first;
+ bool channel_exists = false;
+ auto iter = channels_.find(component);
+ if (iter == channels_.end()) {
+ channel = CreateTransportChannel(component);
+ channels_.insert(std::pair<int, TransportChannelImpl*>(component, channel));
} else {
- impl = iterator->second.get();
- impl_exists = true;
+ channel = iter->second;
+ channel_exists = true;
}
- // Increase the ref count.
- iterator->second.AddRef();
channels_destroyed_ = false;
- if (impl_exists) {
- // If this is an existing channel, we should just return it without
- // connecting to all the signal again.
- return impl;
+ if (channel_exists) {
+ // If this is an existing channel, we should just return it.
+ return channel;
}
// Push down our transport state to the new channel.
- impl->SetIceRole(ice_role_);
- impl->SetIceTiebreaker(tiebreaker_);
- impl->SetIceConfig(ice_config_);
+ channel->SetIceRole(ice_role_);
+ channel->SetIceTiebreaker(tiebreaker_);
+ channel->SetIceConfig(ice_config_);
// TODO(ronghuawu): Change CreateChannel to be able to return error since
// below Apply**Description calls can fail.
if (local_description_)
- ApplyLocalTransportDescription(impl, NULL);
+ ApplyLocalTransportDescription(channel, nullptr);
if (remote_description_)
- ApplyRemoteTransportDescription(impl, NULL);
+ ApplyRemoteTransportDescription(channel, nullptr);
if (local_description_ && remote_description_)
- ApplyNegotiatedTransportDescription(impl, NULL);
-
- impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
- impl->SignalReceivingState.connect(this, &Transport::OnChannelReceivingState);
- impl->SignalGatheringState.connect(this, &Transport::OnChannelGatheringState);
- impl->SignalCandidateGathered.connect(this,
- &Transport::OnChannelCandidateGathered);
- impl->SignalRouteChange.connect(this, &Transport::OnChannelRouteChange);
- impl->SignalRoleConflict.connect(this, &Transport::OnRoleConflict);
- impl->SignalConnectionRemoved.connect(
- this, &Transport::OnChannelConnectionRemoved);
+ ApplyNegotiatedTransportDescription(channel, nullptr);
if (connect_requested_) {
- impl->Connect();
- if (channels_.size() == 1) {
- // If this is the first channel, then indicate that we have started
- // connecting.
- SignalConnecting(this);
- }
+ channel->Connect();
}
- return impl;
+ return channel;
}
TransportChannelImpl* Transport::GetChannel(int component) {
- ChannelMap::iterator iter = channels_.find(component);
- return (iter != channels_.end()) ? iter->second.get() : NULL;
+ auto iter = channels_.find(component);
+ return (iter != channels_.end()) ? iter->second : nullptr;
}
bool Transport::HasChannels() {
@@ -270,32 +214,13 @@
}
void Transport::DestroyChannel(int component) {
- ChannelMap::iterator iter = channels_.find(component);
+ auto iter = channels_.find(component);
if (iter == channels_.end())
return;
- TransportChannelImpl* impl = NULL;
-
- iter->second.DecRef();
- if (!iter->second.ref()) {
- impl = iter->second.get();
- channels_.erase(iter);
- }
-
- if (connect_requested_ && channels_.empty()) {
- // We're no longer attempting to connect.
- SignalConnecting(this);
- }
-
- if (impl) {
- DestroyTransportChannel(impl);
- // Need to update aggregate state after destroying a channel,
- // for example if it was the only one that wasn't yet writable.
- UpdateWritableState();
- UpdateReceivingState();
- UpdateGatheringState();
- MaybeSignalCompleted();
- }
+ TransportChannelImpl* channel = iter->second;
+ channels_.erase(iter);
+ DestroyTransportChannel(channel);
}
void Transport::ConnectChannels() {
@@ -316,14 +241,11 @@
TransportDescription desc(
std::vector<std::string>(), rtc::CreateRandomString(ICE_UFRAG_LENGTH),
rtc::CreateRandomString(ICE_PWD_LENGTH), ICEMODE_FULL,
- CONNECTIONROLE_NONE, NULL, Candidates());
- SetLocalTransportDescription(desc, CA_OFFER, NULL);
+ CONNECTIONROLE_NONE, nullptr, Candidates());
+ SetLocalTransportDescription(desc, CA_OFFER, nullptr);
}
CallChannels(&TransportChannelImpl::Connect);
- if (HasChannels()) {
- SignalConnecting(this);
- }
}
void Transport::MaybeStartGathering() {
@@ -333,24 +255,16 @@
}
void Transport::DestroyAllChannels() {
- std::vector<TransportChannelImpl*> impls;
- for (auto& iter : channels_) {
- iter.second.DecRef();
- if (!iter.second.ref())
- impls.push_back(iter.second.get());
+ for (const auto& kv : channels_) {
+ DestroyTransportChannel(kv.second);
}
-
channels_.clear();
-
- for (TransportChannelImpl* impl : impls) {
- DestroyTransportChannel(impl);
- }
channels_destroyed_ = true;
}
void Transport::CallChannels(TransportChannelFunc func) {
- for (const auto& iter : channels_) {
- ((iter.second.get())->*func)();
+ for (const auto& kv : channels_) {
+ (kv.second->*func)();
}
}
@@ -389,13 +303,13 @@
bool Transport::GetStats(TransportStats* stats) {
stats->transport_name = name();
stats->channel_stats.clear();
- for (auto iter : channels_) {
- ChannelMapEntry& entry = iter.second;
+ for (auto kv : channels_) {
+ TransportChannelImpl* channel = kv.second;
TransportChannelStats substats;
- substats.component = entry->component();
- entry->GetSrtpCipher(&substats.srtp_cipher);
- entry->GetSslCipher(&substats.ssl_cipher);
- if (!entry->GetStats(&substats.connection_infos)) {
+ substats.component = channel->component();
+ channel->GetSrtpCipher(&substats.srtp_cipher);
+ channel->GetSslCipher(&substats.ssl_cipher);
+ if (!channel->GetStats(&substats.connection_infos)) {
return false;
}
stats->channel_stats.push_back(substats);
@@ -418,170 +332,15 @@
}
}
- for (std::vector<Candidate>::const_iterator iter = candidates.begin();
- iter != candidates.end();
- ++iter) {
- TransportChannelImpl* channel = GetChannel(iter->component());
- if (channel != NULL) {
- channel->AddRemoteCandidate(*iter);
+ for (const Candidate& candidate : candidates) {
+ TransportChannelImpl* channel = GetChannel(candidate.component());
+ if (channel != nullptr) {
+ channel->AddRemoteCandidate(candidate);
}
}
return true;
}
-void Transport::OnChannelWritableState(TransportChannel* channel) {
- LOG(LS_INFO) << name() << " TransportChannel " << channel->component()
- << " writability changed to " << channel->writable()
- << ". Check if transport is complete.";
- UpdateWritableState();
- MaybeSignalCompleted();
-}
-
-void Transport::OnChannelReceivingState(TransportChannel* channel) {
- UpdateReceivingState();
-}
-
-TransportState Transport::GetTransportState(TransportStateType state_type) {
- bool any = false;
- bool all = !channels_.empty();
- for (const auto iter : channels_) {
- bool b = false;
- switch (state_type) {
- case TRANSPORT_WRITABLE_STATE:
- b = iter.second->writable();
- break;
- case TRANSPORT_RECEIVING_STATE:
- b = iter.second->receiving();
- break;
- default:
- ASSERT(false);
- }
- any |= b;
- all &= b;
- }
-
- if (all) {
- return TRANSPORT_STATE_ALL;
- } else if (any) {
- return TRANSPORT_STATE_SOME;
- }
-
- return TRANSPORT_STATE_NONE;
-}
-
-void Transport::OnChannelGatheringState(TransportChannelImpl* channel) {
- ASSERT(channels_.find(channel->component()) != channels_.end());
- UpdateGatheringState();
- if (gathering_state_ == kIceGatheringComplete) {
- // If UpdateGatheringState brought us to kIceGatheringComplete, check if
- // our connection state is also "Completed". Otherwise, there's no point in
- // checking (since it would only produce log messages).
- MaybeSignalCompleted();
- }
-}
-
-void Transport::OnChannelCandidateGathered(TransportChannelImpl* channel,
- const Candidate& candidate) {
- // We should never signal peer-reflexive candidates.
- if (candidate.type() == PRFLX_PORT_TYPE) {
- ASSERT(false);
- return;
- }
-
- ASSERT(connect_requested_);
- std::vector<Candidate> candidates;
- candidates.push_back(candidate);
- SignalCandidatesGathered(this, candidates);
-}
-
-void Transport::OnChannelRouteChange(TransportChannel* channel,
- const Candidate& remote_candidate) {
- SignalRouteChange(this, remote_candidate.component(), remote_candidate);
-}
-
-void Transport::OnRoleConflict(TransportChannelImpl* channel) {
- SignalRoleConflict();
-}
-
-void Transport::OnChannelConnectionRemoved(TransportChannelImpl* channel) {
- LOG(LS_INFO) << name() << " TransportChannel " << channel->component()
- << " connection removed. Check if transport is complete.";
- MaybeSignalCompleted();
-
- // Check if the state is now Failed.
- // Failed is only available in the Controlling ICE role.
- if (channel->GetIceRole() != ICEROLE_CONTROLLING) {
- return;
- }
-
- // Failed can only occur after candidate gathering has stopped.
- if (channel->gathering_state() != kIceGatheringComplete) {
- return;
- }
-
- if (channel->GetState() == TransportChannelState::STATE_FAILED) {
- // A Transport has failed if any of its channels have no remaining
- // connections.
- SignalFailed(this);
- }
-}
-
-void Transport::MaybeSignalCompleted() {
- if (AllChannelsCompleted()) {
- LOG(LS_INFO) << name() << " transport is complete"
- << " because all the channels are complete.";
- SignalCompleted(this);
- }
- // TODO(deadbeef): Should we do anything if we previously were completed,
- // but now are not (if, for example, a new remote candidate is added)?
-}
-
-void Transport::UpdateGatheringState() {
- IceGatheringState new_state = kIceGatheringNew;
- bool any_gathering = false;
- bool all_complete = !channels_.empty();
- for (const auto& kv : channels_) {
- any_gathering =
- any_gathering || kv.second->gathering_state() != kIceGatheringNew;
- all_complete =
- all_complete && kv.second->gathering_state() == kIceGatheringComplete;
- }
- if (all_complete) {
- new_state = kIceGatheringComplete;
- } else if (any_gathering) {
- new_state = kIceGatheringGathering;
- }
-
- if (gathering_state_ != new_state) {
- gathering_state_ = new_state;
- if (gathering_state_ == kIceGatheringGathering) {
- LOG(LS_INFO) << "Transport: " << name_ << ", gathering candidates";
- } else if (gathering_state_ == kIceGatheringComplete) {
- LOG(LS_INFO) << "Transport " << name() << " gathering complete.";
- }
- SignalGatheringState(this);
- }
-}
-
-void Transport::UpdateReceivingState() {
- TransportState receiving = GetTransportState(TRANSPORT_RECEIVING_STATE);
- if (receiving_ != receiving) {
- receiving_ = receiving;
- SignalReceivingState(this);
- }
-}
-
-void Transport::UpdateWritableState() {
- TransportState writable = GetTransportState(TRANSPORT_WRITABLE_STATE);
- LOG(LS_INFO) << name() << " transport writable state changed? " << writable_
- << " => " << writable;
- if (writable_ != writable) {
- was_writable_ = (writable_ == TRANSPORT_STATE_ALL);
- writable_ = writable;
- SignalWritableState(this);
- }
-}
-
bool Transport::ApplyLocalTransportDescription(TransportChannelImpl* ch,
std::string* error_desc) {
ch->SetIceCredentials(local_description_->ice_ufrag,
@@ -623,9 +382,10 @@
// between future SetRemote/SetLocal invocations and new channel
// creation, we have the negotiation state saved until a new
// negotiation happens.
- for (auto& iter : channels_) {
- if (!ApplyNegotiatedTransportDescription(iter.second.get(), error_desc))
+ for (const auto& kv : channels_) {
+ if (!ApplyNegotiatedTransportDescription(kv.second, error_desc)) {
return false;
+ }
}
return true;
}
diff --git a/p2p/base/transport.h b/p2p/base/transport.h
index 10b289c..a3daa39 100644
--- a/p2p/base/transport.h
+++ b/p2p/base/transport.h
@@ -65,21 +65,6 @@
kIceGatheringComplete,
};
-// For "writable" and "receiving", we need to differentiate between
-// none, all, and some.
-enum TransportState {
- TRANSPORT_STATE_NONE = 0,
- TRANSPORT_STATE_SOME,
- TRANSPORT_STATE_ALL
-};
-
-// When checking transport state, we need to differentiate between
-// "writable" or "receiving" check.
-enum TransportStateType {
- TRANSPORT_WRITABLE_STATE = 0,
- TRANSPORT_RECEIVING_STATE
-};
-
// Stats that we can return about the connections for a transport channel.
// TODO(hta): Rename to ConnectionStats
struct ConnectionInfo {
@@ -165,37 +150,10 @@
// Returns the port allocator object for this transport.
PortAllocator* port_allocator() { return allocator_; }
- // Returns the states of this manager. These bits are the ORs
- // of the corresponding bits on the managed channels. Each time one of these
- // states changes, a signal is raised.
- // TODO(honghaiz): Replace uses of writable() with any_channels_writable().
- bool writable() const { return any_channels_writable(); }
- bool was_writable() const { return was_writable_; }
- bool any_channels_writable() const {
- return (writable_ == TRANSPORT_STATE_SOME ||
- writable_ == TRANSPORT_STATE_ALL);
- }
- bool all_channels_writable() const {
- return (writable_ == TRANSPORT_STATE_ALL);
- }
- bool any_channel_receiving() const {
- return (receiving_ == TRANSPORT_STATE_SOME ||
- receiving_ == TRANSPORT_STATE_ALL);
- }
bool ready_for_remote_candidates() const {
return local_description_set_ && remote_description_set_;
}
- bool AllChannelsCompleted() const;
- bool AnyChannelFailed() const;
-
- IceGatheringState gathering_state() const { return gathering_state_; }
-
- sigslot::signal1<Transport*> SignalWritableState;
- sigslot::signal1<Transport*> SignalReceivingState;
- sigslot::signal1<Transport*> SignalCompleted;
- sigslot::signal1<Transport*> SignalFailed;
-
// Returns whether the client has requested the channels to connect.
bool connect_requested() const { return connect_requested_; }
@@ -229,6 +187,7 @@
return (NULL != GetChannel(component));
}
bool HasChannels();
+
void DestroyChannel(int component);
// Set the local TransportDescription to be used by TransportChannels.
@@ -241,10 +200,8 @@
ContentAction action,
std::string* error_desc);
- // Tells all current and future channels to start connecting. When the first
- // channel begins connecting, the following signal is raised.
+ // Tells all current and future channels to start connecting.
void ConnectChannels();
- sigslot::signal1<Transport*> SignalConnecting;
// Tells channels to start gathering candidates if necessary.
// Should be called after ConnectChannels() has been called at least once,
@@ -260,12 +217,6 @@
bool GetStats(TransportStats* stats);
- sigslot::signal1<Transport*> SignalGatheringState;
-
- // Handles sending of ready candidates and receiving of remote candidates.
- sigslot::signal2<Transport*, const std::vector<Candidate>&>
- SignalCandidatesGathered;
-
// Called when one or more candidates are ready from the remote peer.
bool AddRemoteCandidates(const std::vector<Candidate>& candidates,
std::string* error);
@@ -275,14 +226,6 @@
virtual bool VerifyCandidate(const Candidate& candidate,
std::string* error);
- // Signals when the best connection for a channel changes.
- sigslot::signal3<Transport*,
- int, // component
- const Candidate&> SignalRouteChange;
-
- // Forwards the signal from TransportChannel to BaseSession.
- sigslot::signal0<> SignalRoleConflict;
-
virtual bool GetSslRole(rtc::SSLRole* ssl_role) const { return false; }
// Must be called before channel is starting to connect.
@@ -335,74 +278,16 @@
std::string* error_desc);
private:
- struct ChannelMapEntry {
- ChannelMapEntry() : impl_(NULL), ref_(0) {}
- explicit ChannelMapEntry(TransportChannelImpl *impl)
- : impl_(impl),
- ref_(0) {
- }
-
- void AddRef() { ++ref_; }
- void DecRef() {
- ASSERT(ref_ > 0);
- --ref_;
- }
- int ref() const { return ref_; }
-
- TransportChannelImpl* get() const { return impl_; }
- TransportChannelImpl* operator->() const { return impl_; }
-
- private:
- TransportChannelImpl* impl_;
- int ref_;
- };
-
- // Candidate component => ChannelMapEntry
- typedef std::map<int, ChannelMapEntry> ChannelMap;
-
- // Called when the write state of a channel changes.
- void OnChannelWritableState(TransportChannel* channel);
-
- // Called when the receiving state of a channel changes.
- void OnChannelReceivingState(TransportChannel* channel);
-
- // Called when a channel starts finishes gathering candidates
- void OnChannelGatheringState(TransportChannelImpl* channel);
-
- // Called when a candidate is ready from channel.
- void OnChannelCandidateGathered(TransportChannelImpl* channel,
- const Candidate& candidate);
- void OnChannelRouteChange(TransportChannel* channel,
- const Candidate& remote_candidate);
- // Called when there is ICE role change.
- void OnRoleConflict(TransportChannelImpl* channel);
- // Called when the channel removes a connection.
- void OnChannelConnectionRemoved(TransportChannelImpl* channel);
+ // Candidate component => TransportChannelImpl*
+ typedef std::map<int, TransportChannelImpl*> ChannelMap;
// Helper function that invokes the given function on every channel.
typedef void (TransportChannelImpl::* TransportChannelFunc)();
void CallChannels(TransportChannelFunc func);
- // Computes the AND and OR of the channel's read/write/receiving state
- // (argument picks the operation).
- TransportState GetTransportState(TransportStateType type);
-
- // Sends SignalCompleted if we are now in that state.
- void MaybeSignalCompleted();
-
- // Sends SignalGatheringState if gathering state changed
- void UpdateGatheringState();
-
- void UpdateWritableState();
- void UpdateReceivingState();
-
const std::string name_;
PortAllocator* const allocator_;
bool channels_destroyed_ = false;
- TransportState readable_ = TRANSPORT_STATE_NONE;
- TransportState writable_ = TRANSPORT_STATE_NONE;
- TransportState receiving_ = TRANSPORT_STATE_NONE;
- bool was_writable_ = false;
bool connect_requested_ = false;
IceRole ice_role_ = ICEROLE_UNKNOWN;
uint64 tiebreaker_ = 0;
@@ -412,7 +297,6 @@
rtc::scoped_ptr<TransportDescription> remote_description_;
bool local_description_set_ = false;
bool remote_description_set_ = false;
- IceGatheringState gathering_state_ = kIceGatheringNew;
ChannelMap channels_;
diff --git a/p2p/base/transport_unittest.cc b/p2p/base/transport_unittest.cc
index 9febfe3..1f66a47 100644
--- a/p2p/base/transport_unittest.cc
+++ b/p2p/base/transport_unittest.cc
@@ -34,15 +34,7 @@
public sigslot::has_slots<> {
public:
TransportTest()
- : transport_(new FakeTransport("test content name")),
- channel_(NULL),
- connecting_signalled_(false),
- completed_(false),
- failed_(false) {
- transport_->SignalConnecting.connect(this, &TransportTest::OnConnecting);
- transport_->SignalCompleted.connect(this, &TransportTest::OnCompleted);
- transport_->SignalFailed.connect(this, &TransportTest::OnFailed);
- }
+ : transport_(new FakeTransport("test content name")), channel_(NULL) {}
~TransportTest() {
transport_->DestroyAllChannels();
}
@@ -60,30 +52,10 @@
}
protected:
- void OnConnecting(Transport* transport) {
- connecting_signalled_ = true;
- }
- void OnCompleted(Transport* transport) {
- completed_ = true;
- }
- void OnFailed(Transport* transport) {
- failed_ = true;
- }
-
rtc::scoped_ptr<FakeTransport> transport_;
FakeTransportChannel* channel_;
- bool connecting_signalled_;
- bool completed_;
- bool failed_;
};
-// Test that calling ConnectChannels triggers an OnConnecting signal.
-TEST_F(TransportTest, TestConnectChannelsDoesSignal) {
- EXPECT_TRUE(SetupChannel());
- transport_->ConnectChannels();
- EXPECT_TRUE(connecting_signalled_);
-}
-
// This test verifies channels are created with proper ICE
// role, tiebreaker and remote ice mode and credentials after offer and
// answer negotiations.
@@ -200,41 +172,6 @@
EXPECT_EQ(cricket::ICEROLE_CONTROLLING, channel_->GetIceRole());
}
-// This test verifies that the Completed and Failed states can be reached.
-TEST_F(TransportTest, TestChannelCompletedAndFailed) {
- transport_->SetIceRole(cricket::ICEROLE_CONTROLLING);
- cricket::TransportDescription local_desc(kIceUfrag1, kIcePwd1);
- ASSERT_TRUE(transport_->SetLocalTransportDescription(local_desc,
- cricket::CA_OFFER,
- NULL));
- EXPECT_TRUE(SetupChannel());
-
- cricket::TransportDescription remote_desc(kIceUfrag1, kIcePwd1);
- ASSERT_TRUE(transport_->SetRemoteTransportDescription(remote_desc,
- cricket::CA_ANSWER,
- NULL));
-
- channel_->SetConnectionCount(2);
- channel_->SetCandidatesGatheringComplete();
- channel_->SetWritable(true);
- EXPECT_TRUE_WAIT(transport_->all_channels_writable(), 100);
- // ICE is not yet completed because there is still more than one connection.
- EXPECT_FALSE(completed_);
- EXPECT_FALSE(failed_);
-
- // When the connection count drops to 1, SignalCompleted should be emitted,
- // and completed() should be true.
- channel_->SetConnectionCount(1);
- EXPECT_TRUE_WAIT(completed_, 100);
- completed_ = false;
-
- // When the connection count drops to 0, SignalFailed should be emitted, and
- // completed() should be false.
- channel_->SetConnectionCount(0);
- EXPECT_TRUE_WAIT(failed_, 100);
- EXPECT_FALSE(completed_);
-}
-
// Tests channel role is reversed after receiving ice-lite from remote.
TEST_F(TransportTest, TestSetRemoteIceLiteInOffer) {
transport_->SetIceRole(cricket::ICEROLE_CONTROLLED);
@@ -293,23 +230,3 @@
EXPECT_EQ(1, stats.channel_stats[0].component);
}
-TEST_F(TransportTest, TestReceivingStateChange) {
- ASSERT_TRUE(SetupChannel());
- channel_->SetConnectionCount(1);
- transport_->ConnectChannels();
- EXPECT_FALSE(transport_->any_channel_receiving());
-
- channel_->SetReceiving(true);
- EXPECT_TRUE_WAIT(transport_->any_channel_receiving(), 100);
- FakeTransportChannel* channel2 = CreateChannel(2);
- channel2->SetReceiving(true);
- EXPECT_TRUE_WAIT(transport_->any_channel_receiving(), 100);
-
- channel2->SetReceiving(false);
- EXPECT_TRUE_WAIT(transport_->any_channel_receiving(), 100);
-
- // After both channels become not receiving, the transport receiving state
- // becomes TRANSPORT_STATE_NONE.
- channel_->SetReceiving(false);
- EXPECT_TRUE_WAIT(!transport_->any_channel_receiving(), 100);
-}
diff --git a/p2p/base/transportcontroller.cc b/p2p/base/transportcontroller.cc
index d84d574..22b827a 100644
--- a/p2p/base/transportcontroller.cc
+++ b/p2p/base/transportcontroller.cc
@@ -10,11 +10,14 @@
#include "webrtc/p2p/base/transportcontroller.h"
+#include <algorithm>
+
#include "webrtc/base/bind.h"
#include "webrtc/base/checks.h"
#include "webrtc/base/thread.h"
#include "webrtc/p2p/base/dtlstransport.h"
#include "webrtc/p2p/base/p2ptransport.h"
+#include "webrtc/p2p/base/port.h"
namespace cricket {
@@ -140,8 +143,32 @@
int component) {
RTC_DCHECK(worker_thread_->IsCurrent());
+ auto it = FindChannel_w(transport_name, component);
+ if (it != channels_.end()) {
+ // Channel already exists; increment reference count and return.
+ it->AddRef();
+ return it->get();
+ }
+
+ // Need to create a new channel.
Transport* transport = GetOrCreateTransport_w(transport_name);
- return transport->CreateChannel(component);
+ TransportChannelImpl* channel = transport->CreateChannel(component);
+ channel->SignalWritableState.connect(
+ this, &TransportController::OnChannelWritableState_w);
+ channel->SignalReceivingState.connect(
+ this, &TransportController::OnChannelReceivingState_w);
+ channel->SignalGatheringState.connect(
+ this, &TransportController::OnChannelGatheringState_w);
+ channel->SignalCandidateGathered.connect(
+ this, &TransportController::OnChannelCandidateGathered_w);
+ channel->SignalRoleConflict.connect(
+ this, &TransportController::OnChannelRoleConflict_w);
+ channel->SignalConnectionRemoved.connect(
+ this, &TransportController::OnChannelConnectionRemoved_w);
+ channels_.insert(channels_.end(), RefCountedChannel(channel))->AddRef();
+ // Adding a channel could cause aggregate state to change.
+ UpdateAggregateStates_w();
+ return channel;
}
void TransportController::DestroyTransportChannel_w(
@@ -149,18 +176,29 @@
int component) {
RTC_DCHECK(worker_thread_->IsCurrent());
- Transport* transport = GetTransport_w(transport_name);
- if (!transport) {
- ASSERT(false);
+ auto it = FindChannel_w(transport_name, component);
+ if (it == channels_.end()) {
+ LOG(LS_WARNING) << "Attempting to delete " << transport_name
+ << " TransportChannel " << component
+ << ", which doesn't exist.";
return;
}
- transport->DestroyChannel(component);
+ it->DecRef();
+ if (it->ref() > 0) {
+ return;
+ }
+
+ channels_.erase(it);
+ Transport* transport = GetTransport_w(transport_name);
+ transport->DestroyChannel(component);
// Just as we create a Transport when its first channel is created,
// we delete it when its last channel is deleted.
if (!transport->HasChannels()) {
DestroyTransport_w(transport_name);
}
+ // Removing a channel could cause aggregate state to change.
+ UpdateAggregateStates_w();
}
const rtc::scoped_refptr<rtc::RTCCertificate>&
@@ -221,6 +259,17 @@
}
}
+std::vector<TransportController::RefCountedChannel>::iterator
+TransportController::FindChannel_w(const std::string& transport_name,
+ int component) {
+ return std::find_if(
+ channels_.begin(), channels_.end(),
+ [transport_name, component](const RefCountedChannel& channel) {
+ return channel->transport_name() == transport_name &&
+ channel->component() == component;
+ });
+}
+
Transport* TransportController::GetOrCreateTransport_w(
const std::string& transport_name) {
RTC_DCHECK(worker_thread_->IsCurrent());
@@ -240,22 +289,6 @@
if (certificate_) {
transport->SetLocalCertificate(certificate_);
}
- transport->SignalConnecting.connect(
- this, &TransportController::OnTransportConnecting_w);
- transport->SignalWritableState.connect(
- this, &TransportController::OnTransportWritableState_w);
- transport->SignalReceivingState.connect(
- this, &TransportController::OnTransportReceivingState_w);
- transport->SignalCompleted.connect(
- this, &TransportController::OnTransportCompleted_w);
- transport->SignalFailed.connect(this,
- &TransportController::OnTransportFailed_w);
- transport->SignalGatheringState.connect(
- this, &TransportController::OnTransportGatheringState_w);
- transport->SignalCandidatesGathered.connect(
- this, &TransportController::OnTransportCandidatesGathered_w);
- transport->SignalRoleConflict.connect(
- this, &TransportController::OnTransportRoleConflict_w);
transports_[transport_name] = transport;
return transport;
@@ -270,8 +303,6 @@
delete iter->second;
transports_.erase(transport_name);
}
- // Destroying a transport may cause aggregate state to change.
- UpdateAggregateStates_w();
}
void TransportController::DestroyAllTransports_w() {
@@ -447,49 +478,49 @@
return transport->GetStats(stats);
}
-void TransportController::OnTransportConnecting_w(Transport* transport) {
+void TransportController::OnChannelWritableState_w(TransportChannel* channel) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ LOG(LS_INFO) << channel->transport_name() << " TransportChannel "
+ << channel->component() << " writability changed to "
+ << channel->writable() << ".";
+ UpdateAggregateStates_w();
+}
+
+void TransportController::OnChannelReceivingState_w(TransportChannel* channel) {
RTC_DCHECK(worker_thread_->IsCurrent());
UpdateAggregateStates_w();
}
-void TransportController::OnTransportWritableState_w(Transport* transport) {
+void TransportController::OnChannelGatheringState_w(
+ TransportChannelImpl* channel) {
RTC_DCHECK(worker_thread_->IsCurrent());
UpdateAggregateStates_w();
}
-void TransportController::OnTransportReceivingState_w(Transport* transport) {
+void TransportController::OnChannelCandidateGathered_w(
+ TransportChannelImpl* channel,
+ const Candidate& candidate) {
RTC_DCHECK(worker_thread_->IsCurrent());
- UpdateAggregateStates_w();
-}
-void TransportController::OnTransportCompleted_w(Transport* transport) {
- RTC_DCHECK(worker_thread_->IsCurrent());
- UpdateAggregateStates_w();
-}
-
-void TransportController::OnTransportFailed_w(Transport* transport) {
- RTC_DCHECK(worker_thread_->IsCurrent());
- UpdateAggregateStates_w();
-}
-
-void TransportController::OnTransportGatheringState_w(Transport* transport) {
- RTC_DCHECK(worker_thread_->IsCurrent());
- UpdateAggregateStates_w();
-}
-
-void TransportController::OnTransportCandidatesGathered_w(
- Transport* transport,
- const std::vector<Candidate>& candidates) {
- RTC_DCHECK(worker_thread_->IsCurrent());
- CandidatesData* data = new CandidatesData(transport->name(), candidates);
+ // We should never signal peer-reflexive candidates.
+ if (candidate.type() == PRFLX_PORT_TYPE) {
+ RTC_DCHECK(false);
+ return;
+ }
+ std::vector<Candidate> candidates;
+ candidates.push_back(candidate);
+ CandidatesData* data =
+ new CandidatesData(channel->transport_name(), candidates);
signaling_thread_->Post(this, MSG_CANDIDATESGATHERED, data);
}
-void TransportController::OnTransportRoleConflict_w() {
+void TransportController::OnChannelRoleConflict_w(
+ TransportChannelImpl* channel) {
RTC_DCHECK(worker_thread_->IsCurrent());
if (ice_role_switch_) {
- LOG(LS_WARNING) << "Repeat of role conflict signal from Transport.";
+ LOG(LS_WARNING)
+ << "Repeat of role conflict signal from TransportChannelImpl.";
return;
}
@@ -502,6 +533,15 @@
}
}
+void TransportController::OnChannelConnectionRemoved_w(
+ TransportChannelImpl* channel) {
+ RTC_DCHECK(worker_thread_->IsCurrent());
+ LOG(LS_INFO) << channel->transport_name() << " TransportChannel "
+ << channel->component()
+ << " connection removed. Check if state is complete.";
+ UpdateAggregateStates_w();
+}
+
void TransportController::UpdateAggregateStates_w() {
RTC_DCHECK(worker_thread_->IsCurrent());
@@ -509,24 +549,24 @@
IceGatheringState new_gathering_state = kIceGatheringNew;
bool any_receiving = false;
bool any_failed = false;
- bool all_connected = HasChannels_w();
- bool all_completed = HasChannels_w();
+ bool all_connected = !channels_.empty();
+ bool all_completed = !channels_.empty();
bool any_gathering = false;
- bool all_done_gathering = HasChannels_w();
- for (const auto& kv : transports_) {
- // Ignore transports without channels since they're about to be deleted,
- // and their state is meaningless.
- if (!kv.second->HasChannels()) {
- continue;
- }
- any_receiving = any_receiving || kv.second->any_channel_receiving();
- any_failed = any_failed || kv.second->AnyChannelFailed();
- all_connected = all_connected && kv.second->all_channels_writable();
- all_completed = all_completed && kv.second->AllChannelsCompleted();
+ bool all_done_gathering = !channels_.empty();
+ for (const auto& channel : channels_) {
+ any_receiving = any_receiving || channel->receiving();
+ any_failed = any_failed ||
+ channel->GetState() == TransportChannelState::STATE_FAILED;
+ all_connected = all_connected && channel->writable();
+ all_completed =
+ all_completed && channel->writable() &&
+ channel->GetState() == TransportChannelState::STATE_COMPLETED &&
+ channel->GetIceRole() == ICEROLE_CONTROLLING &&
+ channel->gathering_state() == kIceGatheringComplete;
any_gathering =
- any_gathering || kv.second->gathering_state() != kIceGatheringNew;
+ any_gathering || channel->gathering_state() != kIceGatheringNew;
all_done_gathering = all_done_gathering &&
- kv.second->gathering_state() == kIceGatheringComplete;
+ channel->gathering_state() == kIceGatheringComplete;
}
if (any_failed) {
@@ -562,13 +602,4 @@
}
}
-bool TransportController::HasChannels_w() {
- for (const auto& kv : transports_) {
- if (kv.second->HasChannels()) {
- return true;
- }
- }
- return false;
-}
-
} // namespace cricket
diff --git a/p2p/base/transportcontroller.h b/p2p/base/transportcontroller.h
index f506e01..45fcfea 100644
--- a/p2p/base/transportcontroller.h
+++ b/p2p/base/transportcontroller.h
@@ -81,9 +81,14 @@
bool ReadyForRemoteCandidates(const std::string& transport_name);
bool GetStats(const std::string& transport_name, TransportStats* stats);
+ // Creates a channel if it doesn't exist. Otherwise, increments a reference
+ // count and returns an existing channel.
virtual TransportChannel* CreateTransportChannel_w(
const std::string& transport_name,
int component);
+
+ // Decrements a channel's reference count, and destroys the channel if
+ // nothing is referencing it.
virtual void DestroyTransportChannel_w(const std::string& transport_name,
int component);
@@ -121,6 +126,33 @@
private:
void OnMessage(rtc::Message* pmsg) override;
+ // It's the Transport that's currently responsible for creating/destroying
+ // channels, but the TransportController keeps track of how many external
+ // objects (BaseChannels) reference each channel.
+ struct RefCountedChannel {
+ RefCountedChannel() : impl_(nullptr), ref_(0) {}
+ explicit RefCountedChannel(TransportChannelImpl* impl)
+ : impl_(impl), ref_(0) {}
+
+ void AddRef() { ++ref_; }
+ void DecRef() {
+ ASSERT(ref_ > 0);
+ --ref_;
+ }
+ int ref() const { return ref_; }
+
+ TransportChannelImpl* get() const { return impl_; }
+ TransportChannelImpl* operator->() const { return impl_; }
+
+ private:
+ TransportChannelImpl* impl_;
+ int ref_;
+ };
+
+ std::vector<RefCountedChannel>::iterator FindChannel_w(
+ const std::string& transport_name,
+ int component);
+
Transport* GetOrCreateTransport_w(const std::string& transport_name);
void DestroyTransport_w(const std::string& transport_name);
void DestroyAllTransports_w();
@@ -152,29 +184,27 @@
bool GetStats_w(const std::string& transport_name, TransportStats* stats);
// Handlers for signals from Transport.
- void OnTransportConnecting_w(Transport* transport);
- void OnTransportWritableState_w(Transport* transport);
- void OnTransportReceivingState_w(Transport* transport);
- void OnTransportCompleted_w(Transport* transport);
- void OnTransportFailed_w(Transport* transport);
- void OnTransportGatheringState_w(Transport* transport);
- void OnTransportCandidatesGathered_w(
- Transport* transport,
- const std::vector<Candidate>& candidates);
- void OnTransportRoleConflict_w();
+ void OnChannelWritableState_w(TransportChannel* channel);
+ void OnChannelReceivingState_w(TransportChannel* channel);
+ void OnChannelGatheringState_w(TransportChannelImpl* channel);
+ void OnChannelCandidateGathered_w(TransportChannelImpl* channel,
+ const Candidate& candidate);
+ void OnChannelRoleConflict_w(TransportChannelImpl* channel);
+ void OnChannelConnectionRemoved_w(TransportChannelImpl* channel);
void UpdateAggregateStates_w();
- bool HasChannels_w();
rtc::Thread* const signaling_thread_ = nullptr;
rtc::Thread* const worker_thread_ = nullptr;
typedef std::map<std::string, Transport*> TransportMap;
TransportMap transports_;
+ std::vector<RefCountedChannel> channels_;
+
PortAllocator* const port_allocator_ = nullptr;
rtc::SSLProtocolVersion ssl_max_version_ = rtc::SSL_PROTOCOL_DTLS_10;
- // Aggregate state for Transports
+ // Aggregate state for TransportChannelImpls.
IceConnectionState connection_state_ = kIceConnectionConnecting;
bool receiving_ = false;
IceGatheringState gathering_state_ = kIceGatheringNew;