Replace readable with receiving where receiving means receiving anything (stun ping, response or data packet).
If a connection does not receive for 30 seconds, it will be deleted.
BUG=
Review URL: https://codereview.webrtc.org/1351673003
Cr-Commit-Position: refs/heads/master@{#10001}
diff --git a/talk/app/webrtc/statscollector.cc b/talk/app/webrtc/statscollector.cc
index 6327445..76ac76d 100644
--- a/talk/app/webrtc/statscollector.cc
+++ b/talk/app/webrtc/statscollector.cc
@@ -603,9 +603,9 @@
report->set_timestamp(stats_gathering_started_);
const BoolForAdd bools[] = {
- { StatsReport::kStatsValueNameActiveConnection, info.best_connection },
- { StatsReport::kStatsValueNameReadable, info.readable },
- { StatsReport::kStatsValueNameWritable, info.writable },
+ {StatsReport::kStatsValueNameActiveConnection, info.best_connection},
+ {StatsReport::kStatsValueNameReceiving, info.receiving},
+ {StatsReport::kStatsValueNameWritable, info.writable},
};
for (const auto& b : bools)
report->AddBoolean(b.name, b.value);
diff --git a/talk/app/webrtc/statstypes.cc b/talk/app/webrtc/statstypes.cc
index 56d705e..51ec7fd 100644
--- a/talk/app/webrtc/statstypes.cc
+++ b/talk/app/webrtc/statstypes.cc
@@ -556,7 +556,7 @@
return "googPlisSent";
case kStatsValueNamePreferredJitterBufferMs:
return "googPreferredJitterBufferMs";
- case kStatsValueNameReadable:
+ case kStatsValueNameReceiving:
return "googReadable";
case kStatsValueNameRemoteAddress:
return "googRemoteAddress";
diff --git a/talk/app/webrtc/statstypes.h b/talk/app/webrtc/statstypes.h
index 5d5d717..33b2fa7 100644
--- a/talk/app/webrtc/statstypes.h
+++ b/talk/app/webrtc/statstypes.h
@@ -125,7 +125,7 @@
kStatsValueNamePacketsReceived,
kStatsValueNamePacketsSent,
kStatsValueNameProtocol,
- kStatsValueNameReadable,
+ kStatsValueNameReceiving,
kStatsValueNameSelectedCandidatePairId,
kStatsValueNameSsrc,
kStatsValueNameState,
diff --git a/webrtc/p2p/base/dtlstransportchannel.cc b/webrtc/p2p/base/dtlstransportchannel.cc
index 3474237..dcebdee 100644
--- a/webrtc/p2p/base/dtlstransportchannel.cc
+++ b/webrtc/p2p/base/dtlstransportchannel.cc
@@ -97,8 +97,6 @@
dtls_state_(STATE_NONE),
ssl_role_(rtc::SSL_CLIENT),
ssl_max_version_(rtc::SSL_PROTOCOL_DTLS_10) {
- channel_->SignalReadableState.connect(this,
- &DtlsTransportChannelWrapper::OnReadableState);
channel_->SignalWritableState.connect(this,
&DtlsTransportChannelWrapper::OnWritableState);
channel_->SignalReadPacket.connect(this,
@@ -392,25 +390,12 @@
// (1) If we're not doing DTLS-SRTP, then the state is just the
// state of the underlying impl()
// (2) If we're doing DTLS-SRTP:
-// - Prior to the DTLS handshake, the state is neither readable or
+// - Prior to the DTLS handshake, the state is neither receiving nor
// writable
// - When the impl goes writable for the first time we
// start the DTLS handshake
// - Once the DTLS handshake completes, the state is that of the
// impl again
-void DtlsTransportChannelWrapper::OnReadableState(TransportChannel* channel) {
- ASSERT(rtc::Thread::Current() == worker_thread_);
- ASSERT(channel == channel_);
- LOG_J(LS_VERBOSE, this)
- << "DTLSTransportChannelWrapper: channel readable state changed to "
- << channel_->readable();
-
- if (dtls_state_ == STATE_NONE || dtls_state_ == STATE_OPEN) {
- set_readable(channel_->readable());
- // Note: SignalReadableState fired by set_readable.
- }
-}
-
void DtlsTransportChannelWrapper::OnWritableState(TransportChannel* channel) {
ASSERT(rtc::Thread::Current() == worker_thread_);
ASSERT(channel == channel_);
@@ -545,8 +530,6 @@
// The check for OPEN shouldn't be necessary but let's make
// sure we don't accidentally frob the state if it's closed.
dtls_state_ = STATE_OPEN;
-
- set_readable(true);
set_writable(true);
}
}
@@ -564,8 +547,6 @@
} else {
LOG_J(LS_INFO, this) << "DTLS channel error, code=" << err;
}
-
- set_readable(false);
set_writable(false);
dtls_state_ = STATE_CLOSED;
}
diff --git a/webrtc/p2p/base/p2ptransportchannel.cc b/webrtc/p2p/base/p2ptransportchannel.cc
index 094a8dc..ee99060 100644
--- a/webrtc/p2p/base/p2ptransportchannel.cc
+++ b/webrtc/p2p/base/p2ptransportchannel.cc
@@ -221,6 +221,7 @@
void P2PTransportChannel::AddConnection(Connection* connection) {
connections_.push_back(connection);
connection->set_remote_ice_mode(remote_ice_mode_);
+ connection->set_receiving_timeout(receiving_timeout_);
connection->SignalReadPacket.connect(
this, &P2PTransportChannel::OnReadPacket);
connection->SignalReadyToSend.connect(
@@ -340,6 +341,10 @@
receiving_timeout_ = receiving_timeout_ms;
check_receiving_delay_ =
std::max(MIN_CHECK_RECEIVING_DELAY, receiving_timeout_ / 10);
+
+ for (Connection* connection : connections_) {
+ connection->set_receiving_timeout(receiving_timeout_);
+ }
LOG(LS_VERBOSE) << "Set ICE receiving timeout to " << receiving_timeout_
<< " milliseconds";
}
@@ -400,7 +405,7 @@
std::vector<RemoteCandidate>::iterator iter;
for (iter = remote_candidates_.begin(); iter != remote_candidates_.end();
++iter) {
- CreateConnection(port, *iter, iter->origin_port(), false);
+ CreateConnection(port, *iter, iter->origin_port());
}
SortConnections();
@@ -616,7 +621,7 @@
}
// Create connections to this remote candidate.
- CreateConnections(candidate, NULL, false);
+ CreateConnections(candidate, NULL);
// Resort the connections list, which may have new elements.
SortConnections();
@@ -626,8 +631,7 @@
// remote candidate. The return value is true if we created a connection from
// the origin port.
bool P2PTransportChannel::CreateConnections(const Candidate& remote_candidate,
- PortInterface* origin_port,
- bool readable) {
+ PortInterface* origin_port) {
ASSERT(worker_thread_ == rtc::Thread::Current());
Candidate new_remote_candidate(remote_candidate);
@@ -665,7 +669,7 @@
bool created = false;
std::vector<PortInterface *>::reverse_iterator it;
for (it = ports_.rbegin(); it != ports_.rend(); ++it) {
- if (CreateConnection(*it, new_remote_candidate, origin_port, readable)) {
+ if (CreateConnection(*it, new_remote_candidate, origin_port)) {
if (*it == origin_port)
created = true;
}
@@ -673,8 +677,7 @@
if ((origin_port != NULL) &&
std::find(ports_.begin(), ports_.end(), origin_port) == ports_.end()) {
- if (CreateConnection(
- origin_port, new_remote_candidate, origin_port, readable))
+ if (CreateConnection(origin_port, new_remote_candidate, origin_port))
created = true;
}
@@ -688,8 +691,7 @@
// And then listen to connection object for changes.
bool P2PTransportChannel::CreateConnection(PortInterface* port,
const Candidate& remote_candidate,
- PortInterface* origin_port,
- bool readable) {
+ PortInterface* origin_port) {
// Look for an existing connection with this remote address. If one is not
// found, then we can create a new connection for this address.
Connection* connection = port->GetConnection(remote_candidate.address());
@@ -724,11 +726,6 @@
<< connections_.size() << " total)";
}
- // If we are readable, it is because we are creating this in response to a
- // ping from the other side. This will cause the state to become readable.
- if (readable)
- connection->ReceivedPing();
-
return true;
}
@@ -853,8 +850,7 @@
Connection *connection = *it;
ConnectionInfo info;
info.best_connection = (best_connection_ == connection);
- info.readable =
- (connection->read_state() == Connection::STATE_READABLE);
+ info.receiving = connection->receiving();
info.writable =
(connection->write_state() == Connection::STATE_WRITABLE);
info.timeout =
@@ -1029,8 +1025,7 @@
LOG_J(LS_INFO, this) << "New best connection: "
<< best_connection_->ToString();
SignalRouteChange(this, best_connection_->remote_candidate());
- // When it just switched to a best connection, set receiving to true.
- set_receiving(true);
+ set_receiving(best_connection_->receiving());
} else {
LOG_J(LS_INFO, this) << "No best connection";
}
@@ -1046,14 +1041,8 @@
if (writable != this->writable())
LOG(LS_ERROR) << "UpdateChannelState: writable state mismatch";
- bool readable = false;
- for (uint32 i = 0; i < connections_.size(); ++i) {
- if (connections_[i]->read_state() == Connection::STATE_READABLE) {
- readable = true;
- break;
- }
- }
- set_readable(readable);
+ // TODO(honghaiz): The channel receiving state is set in OnCheckReceiving.
+ // Will revisit in a subsequent code change.
}
// We checked the status of our connections and we had at least one that
@@ -1144,10 +1133,7 @@
}
void P2PTransportChannel::OnCheckReceiving() {
- // Check receiving only if the best connection has received data packets
- // because we want to detect not receiving any packets only after the media
- // have started flowing.
- if (best_connection_ && best_connection_->recv_total_bytes() > 0) {
+ if (best_connection_) {
bool receiving = rtc::Time() <=
best_connection_->last_received() + receiving_timeout_;
set_receiving(receiving);
@@ -1171,23 +1157,13 @@
// An never connected connection cannot be written to at all, so pinging is
// out of the question. However, if it has become WRITABLE, it is in the
// reconnecting state so ping is needed.
- if (!conn->connected() && conn->write_state() != Connection::STATE_WRITABLE) {
+ if (!conn->connected() && !conn->writable()) {
return false;
}
- if (writable()) {
- // If we are writable, then we only want to ping connections that could be
- // better than this one, i.e., the ones that were not pruned.
- return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT);
- } else {
- // If we are not writable, then we need to try everything that might work.
- // This includes both connections that do not have write timeout as well as
- // ones that do not have read timeout. A connection could be readable but
- // be in write-timeout if we pruned it before. Since the other side is
- // still pinging it, it very well might still work.
- return (conn->write_state() != Connection::STATE_WRITE_TIMEOUT) ||
- (conn->read_state() != Connection::STATE_READ_TIMEOUT);
- }
+ // If the channel is not writable, ping all candidates. Otherwise, we only
+ // want to ping connections that have not timed out on writing.
+ return !writable() || conn->write_state() != Connection::STATE_WRITE_TIMEOUT;
}
// Returns the next pingable connection to ping. This will be the oldest
diff --git a/webrtc/p2p/base/p2ptransportchannel.h b/webrtc/p2p/base/p2ptransportchannel.h
index a00c17e..a8f1666 100644
--- a/webrtc/p2p/base/p2ptransportchannel.h
+++ b/webrtc/p2p/base/p2ptransportchannel.h
@@ -176,10 +176,11 @@
void HandleAllTimedOut();
Connection* GetBestConnectionOnNetwork(rtc::Network* network) const;
- bool CreateConnections(const Candidate &remote_candidate,
- PortInterface* origin_port, bool readable);
- bool CreateConnection(PortInterface* port, const Candidate& remote_candidate,
- PortInterface* origin_port, bool readable);
+ bool CreateConnections(const Candidate& remote_candidate,
+ PortInterface* origin_port);
+ bool CreateConnection(PortInterface* port,
+ const Candidate& remote_candidate,
+ PortInterface* origin_port);
bool FindConnection(cricket::Connection* connection) const;
uint32 GetRemoteCandidateGeneration(const Candidate& candidate);
diff --git a/webrtc/p2p/base/p2ptransportchannel_unittest.cc b/webrtc/p2p/base/p2ptransportchannel_unittest.cc
index 3bb3be5..d0277f4 100644
--- a/webrtc/p2p/base/p2ptransportchannel_unittest.cc
+++ b/webrtc/p2p/base/p2ptransportchannel_unittest.cc
@@ -492,9 +492,9 @@
CreateChannels(1);
EXPECT_TRUE_WAIT_MARGIN(ep1_ch1() != NULL &&
ep2_ch1() != NULL &&
- ep1_ch1()->readable() &&
+ ep1_ch1()->receiving() &&
ep1_ch1()->writable() &&
- ep2_ch1()->readable() &&
+ ep2_ch1()->receiving() &&
ep2_ch1()->writable(),
expected.connect_wait,
1000);
@@ -561,7 +561,7 @@
}
}
- // This test waits for the transport to become readable and writable on both
+ // This test waits for the transport to become receiving and writable on both
// end points. Once they are, the end points set new local ice credentials to
// restart the ice gathering. Finally it waits for the transport to select a
// new connection using the newly generated ice candidates.
@@ -569,8 +569,8 @@
void TestHandleIceUfragPasswordChanged() {
ep1_ch1()->SetRemoteIceCredentials(kIceUfrag[1], kIcePwd[1]);
ep2_ch1()->SetRemoteIceCredentials(kIceUfrag[0], kIcePwd[0]);
- EXPECT_TRUE_WAIT_MARGIN(ep1_ch1()->readable() && ep1_ch1()->writable() &&
- ep2_ch1()->readable() && ep2_ch1()->writable(),
+ EXPECT_TRUE_WAIT_MARGIN(ep1_ch1()->receiving() && ep1_ch1()->writable() &&
+ ep2_ch1()->receiving() && ep2_ch1()->writable(),
1000, 1000);
const cricket::Candidate* old_local_candidate1 = LocalCandidate(ep1_ch1());
@@ -614,9 +614,9 @@
EXPECT_TRUE_WAIT(GetRoleConflict(0), 1000);
EXPECT_FALSE(GetRoleConflict(1));
- EXPECT_TRUE_WAIT(ep1_ch1()->readable() &&
+ EXPECT_TRUE_WAIT(ep1_ch1()->receiving() &&
ep1_ch1()->writable() &&
- ep2_ch1()->readable() &&
+ ep2_ch1()->receiving() &&
ep2_ch1()->writable(),
1000);
@@ -1103,8 +1103,8 @@
kDefaultPortAllocatorFlags,
kDefaultPortAllocatorFlags);
CreateChannels(1);
- EXPECT_TRUE_WAIT_MARGIN(ep1_ch1()->readable() && ep1_ch1()->writable() &&
- ep2_ch1()->readable() && ep2_ch1()->writable(),
+ EXPECT_TRUE_WAIT_MARGIN(ep1_ch1()->receiving() && ep1_ch1()->writable() &&
+ ep2_ch1()->receiving() && ep2_ch1()->writable(),
1000, 1000);
TestSendRecv(1);
cricket::ConnectionInfos infos;
@@ -1112,7 +1112,7 @@
ASSERT_EQ(1U, infos.size());
EXPECT_TRUE(infos[0].new_connection);
EXPECT_TRUE(infos[0].best_connection);
- EXPECT_TRUE(infos[0].readable);
+ EXPECT_TRUE(infos[0].receiving);
EXPECT_TRUE(infos[0].writable);
EXPECT_FALSE(infos[0].timeout);
EXPECT_EQ(10U, infos[0].sent_total_packets);
@@ -1241,9 +1241,9 @@
// Pump for 1 second and verify that the channels are not connected.
rtc::Thread::Current()->ProcessMessages(1000);
- EXPECT_FALSE(ep1_ch1()->readable());
+ EXPECT_FALSE(ep1_ch1()->receiving());
EXPECT_FALSE(ep1_ch1()->writable());
- EXPECT_FALSE(ep2_ch1()->readable());
+ EXPECT_FALSE(ep2_ch1()->receiving());
EXPECT_FALSE(ep2_ch1()->writable());
DestroyChannels();
@@ -1261,8 +1261,8 @@
ep1_ch1()->set_incoming_only(true);
EXPECT_TRUE_WAIT_MARGIN(ep1_ch1() != NULL && ep2_ch1() != NULL &&
- ep1_ch1()->readable() && ep1_ch1()->writable() &&
- ep2_ch1()->readable() && ep2_ch1()->writable(),
+ ep1_ch1()->receiving() && ep1_ch1()->writable() &&
+ ep2_ch1()->receiving() && ep2_ch1()->writable(),
1000, 1000);
DestroyChannels();
@@ -1287,8 +1287,8 @@
CreateChannels(1);
- EXPECT_TRUE_WAIT(ep1_ch1()->readable() && ep1_ch1()->writable() &&
- ep2_ch1()->readable() && ep2_ch1()->writable(),
+ EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && ep1_ch1()->writable() &&
+ ep2_ch1()->receiving() && ep2_ch1()->writable(),
1000);
EXPECT_TRUE(
ep1_ch1()->best_connection() && ep2_ch1()->best_connection() &&
@@ -1343,9 +1343,9 @@
EXPECT_EQ(kTiebreaker1, ports_before[i]->IceTiebreaker());
}
- EXPECT_TRUE_WAIT(ep1_ch1()->readable() &&
+ EXPECT_TRUE_WAIT(ep1_ch1()->receiving() &&
ep1_ch1()->writable() &&
- ep2_ch1()->readable() &&
+ ep2_ch1()->receiving() &&
ep2_ch1()->writable(),
1000);
@@ -1400,8 +1400,8 @@
CreateChannels(1);
- EXPECT_TRUE_WAIT(ep1_ch1()->readable() && ep1_ch1()->writable() &&
- ep2_ch1()->readable() && ep2_ch1()->writable(),
+ EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && ep1_ch1()->writable() &&
+ ep2_ch1()->receiving() && ep2_ch1()->writable(),
1000);
EXPECT_TRUE(
ep1_ch1()->best_connection() && ep2_ch1()->best_connection() &&
@@ -1426,10 +1426,8 @@
CreateChannels(1);
- EXPECT_TRUE_WAIT(ep1_ch1()->readable() &&
- ep1_ch1()->writable() &&
- ep2_ch1()->readable() &&
- ep2_ch1()->writable(),
+ EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && ep1_ch1()->writable() &&
+ ep2_ch1()->receiving() && ep2_ch1()->writable(),
2000);
EXPECT_TRUE(ep1_ch1()->best_connection() &&
@@ -1508,8 +1506,8 @@
// Create channels and let them go writable, as usual.
CreateChannels(1);
- EXPECT_TRUE_WAIT(ep1_ch1()->readable() && ep1_ch1()->writable() &&
- ep2_ch1()->readable() && ep2_ch1()->writable(),
+ EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && ep1_ch1()->writable() &&
+ ep2_ch1()->receiving() && ep2_ch1()->writable(),
1000);
EXPECT_TRUE(
ep1_ch1()->best_connection() && ep2_ch1()->best_connection() &&
@@ -1552,8 +1550,8 @@
// Create channels and let them go writable, as usual.
CreateChannels(1);
- EXPECT_TRUE_WAIT(ep1_ch1()->readable() && ep1_ch1()->writable() &&
- ep2_ch1()->readable() && ep2_ch1()->writable(),
+ EXPECT_TRUE_WAIT(ep1_ch1()->receiving() && ep1_ch1()->writable() &&
+ ep2_ch1()->receiving() && ep2_ch1()->writable(),
1000);
EXPECT_TRUE(
ep1_ch1()->best_connection() && ep2_ch1()->best_connection() &&
@@ -1706,14 +1704,14 @@
uint32 remote_priority = conn1->remote_candidate().priority();
// Create a higher priority candidate and make the connection
- // readable/writable. This will prune conn1.
+ // receiving/writable. This will prune conn1.
ch.OnCandidate(CreateCandidate("2.2.2.2", 2, 2));
cricket::Connection* conn2 = WaitForConnectionTo(&ch, "2.2.2.2", 2);
ASSERT_TRUE(conn2 != nullptr);
conn2->ReceivedPing();
conn2->ReceivedPingResponse();
- // Wait for conn1 being destroyed.
+ // Wait for conn1 to be destroyed.
EXPECT_TRUE_WAIT(GetConnectionTo(&ch, "1.1.1.1", 1) == nullptr, 3000);
cricket::Port* port = GetPort(&ch);
@@ -1905,7 +1903,7 @@
ch.OnCandidate(CreateCandidate("2.2.2.2", 2, 1));
cricket::Connection* conn2 = WaitForConnectionTo(&ch, "2.2.2.2", 2);
ASSERT_TRUE(conn2 != nullptr);
- conn2->ReceivedPing(); // Become readable.
+ conn2->ReceivedPing(); // Start receiving.
// Do not switch because it is not writable.
conn2->OnReadPacket("ABC", 3, rtc::CreatePacketTime(0));
EXPECT_EQ(conn1, ch.best_connection());
diff --git a/webrtc/p2p/base/port.cc b/webrtc/p2p/base/port.cc
index da66928..d6bc27b 100644
--- a/webrtc/p2p/base/port.cc
+++ b/webrtc/p2p/base/port.cc
@@ -782,8 +782,8 @@
: port_(port),
local_candidate_index_(index),
remote_candidate_(remote_candidate),
- read_state_(STATE_READ_INIT),
write_state_(STATE_WRITE_INIT),
+ receiving_(false),
connected_(true),
pruned_(false),
use_candidate_attr_(false),
@@ -800,7 +800,8 @@
sent_packets_discarded_(0),
sent_packets_total_(0),
reported_(false),
- state_(STATE_WAITING) {
+ state_(STATE_WAITING),
+ receiving_timeout_(WEAK_CONNECTION_RECEIVE_TIMEOUT) {
// All of our connections start in WAITING state.
// TODO(mallinath) - Start connections from STATE_FROZEN.
// Wire up to send stun packets
@@ -841,16 +842,6 @@
return priority;
}
-void Connection::set_read_state(ReadState value) {
- ReadState old_value = read_state_;
- read_state_ = value;
- if (value != old_value) {
- LOG_J(LS_VERBOSE, this) << "set_read_state";
- SignalStateChange(this);
- CheckTimeout();
- }
-}
-
void Connection::set_write_state(WriteState value) {
WriteState old_value = write_state_;
write_state_ = value;
@@ -862,6 +853,15 @@
}
}
+void Connection::set_receiving(bool value) {
+ if (value != receiving_) {
+ LOG_J(LS_VERBOSE, this) << "set_receiving to " << value;
+ receiving_ = value;
+ SignalStateChange(this);
+ CheckTimeout();
+ }
+}
+
void Connection::set_state(State state) {
State old_state = state_;
state_ = state;
@@ -902,27 +902,17 @@
const rtc::SocketAddress& addr(remote_candidate_.address());
if (!port_->GetStunMessage(data, size, addr, msg.accept(), &remote_ufrag)) {
// The packet did not parse as a valid STUN message
+ // This is a data packet, pass it along.
+ set_receiving(true);
+ last_data_received_ = rtc::Time();
+ recv_rate_tracker_.AddSamples(size);
+ SignalReadPacket(this, data, size, packet_time);
- // If this connection is readable, then pass along the packet.
- if (read_state_ == STATE_READABLE) {
- // readable means data from this address is acceptable
- // Send it on!
- last_data_received_ = rtc::Time();
- recv_rate_tracker_.AddSamples(size);
- SignalReadPacket(this, data, size, packet_time);
-
- // If timed out sending writability checks, start up again
- if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) {
- LOG(LS_WARNING) << "Received a data packet on a timed-out Connection. "
- << "Resetting state to STATE_WRITE_INIT.";
- set_write_state(STATE_WRITE_INIT);
- }
- } else {
- // Not readable means the remote address hasn't sent a valid
- // binding request yet.
-
- LOG_J(LS_WARNING, this)
- << "Received non-STUN packet from an unreadable connection.";
+ // If timed out sending writability checks, start up again
+ if (!pruned_ && (write_state_ == STATE_WRITE_TIMEOUT)) {
+ LOG(LS_WARNING) << "Received a data packet on a timed-out Connection. "
+ << "Resetting state to STATE_WRITE_INIT.";
+ set_write_state(STATE_WRITE_INIT);
}
} else if (!msg) {
// The packet was STUN, but failed a check and was handled internally.
@@ -992,12 +982,7 @@
// Otherwise we can mark connection to read timeout. No response will be
// sent in this scenario.
case STUN_BINDING_INDICATION:
- if (read_state_ == STATE_READABLE) {
- ReceivedPing();
- } else {
- LOG_J(LS_WARNING, this) << "Received STUN binding indication "
- << "from an unreadable connection.";
- }
+ ReceivedPing();
break;
default:
@@ -1024,8 +1009,7 @@
void Connection::Destroy() {
LOG_J(LS_VERBOSE, this) << "Connection destroyed";
- set_read_state(STATE_READ_TIMEOUT);
- set_write_state(STATE_WRITE_TIMEOUT);
+ port_->thread()->Post(this, MSG_DELETE);
}
void Connection::PrintPingsSinceLastResponse(std::string* s, size_t max) {
@@ -1089,7 +1073,6 @@
<< " rtt=" << rtt;
set_write_state(STATE_WRITE_UNRELIABLE);
}
-
if ((write_state_ == STATE_WRITE_UNRELIABLE ||
write_state_ == STATE_WRITE_INIT) &&
TooLongWithoutResponse(pings_since_last_response_,
@@ -1101,6 +1084,11 @@
<< ", rtt=" << rtt;
set_write_state(STATE_WRITE_TIMEOUT);
}
+
+ // Check the receiving state.
+ uint32 last_recv_time = last_received();
+ bool receiving = now <= last_recv_time + receiving_timeout_;
+ set_receiving(receiving);
}
void Connection::Ping(uint32 now) {
@@ -1114,8 +1102,8 @@
}
void Connection::ReceivedPing() {
+ set_receiving(true);
last_ping_received_ = rtc::Time();
- set_read_state(STATE_READABLE);
}
void Connection::ReceivedPingResponse() {
@@ -1124,6 +1112,7 @@
// So if we're not already, become writable. We may be bringing a pruned
// connection back to life, but if we don't really want it, we can always
// prune it again.
+ set_receiving(true);
set_write_state(STATE_WRITABLE);
set_state(STATE_SUCCEEDED);
pings_since_last_response_.clear();
@@ -1141,10 +1130,9 @@
'-', // not connected (false)
'C', // connected (true)
};
- const char READ_STATE_ABBREV[3] = {
- '-', // STATE_READ_INIT
- 'R', // STATE_READABLE
- 'x', // STATE_READ_TIMEOUT
+ const char RECEIVE_STATE_ABBREV[2] = {
+ '-', // not receiving (false)
+ 'R', // receiving (true)
};
const char WRITE_STATE_ABBREV[4] = {
'W', // STATE_WRITABLE
@@ -1172,7 +1160,7 @@
<< ":" << remote.type() << ":"
<< remote.protocol() << ":" << remote.address().ToSensitiveString() << "|"
<< CONNECT_STATE_ABBREV[connected()]
- << READ_STATE_ABBREV[read_state()]
+ << RECEIVE_STATE_ABBREV[receiving()]
<< WRITE_STATE_ABBREV[write_state()]
<< ICESTATE[state()] << "|"
<< priority() << "|";
@@ -1197,11 +1185,6 @@
uint32 rtt = request->Elapsed();
ReceivedPingResponse();
- if (remote_ice_mode_ == ICEMODE_LITE) {
- // A ice-lite end point never initiates ping requests. This will allow
- // us to move to STATE_READABLE without an incoming ping request.
- set_read_state(STATE_READABLE);
- }
if (LOG_CHECK_LEVEL_V(sev)) {
bool use_candidate = (
@@ -1269,14 +1252,8 @@
}
void Connection::CheckTimeout() {
- // If both read and write have timed out or read has never initialized, then
- // this connection can contribute no more to p2p socket unless at some later
- // date readability were to come back. However, we gave readability a long
- // time to timeout, so at this point, it seems fair to get rid of this
- // connection.
- if ((read_state_ == STATE_READ_TIMEOUT ||
- read_state_ == STATE_READ_INIT) &&
- write_state_ == STATE_WRITE_TIMEOUT) {
+ // If write has timed out and it is not receiving, remove the connection.
+ if (!receiving_ && write_state_ == STATE_WRITE_TIMEOUT) {
port_->thread()->Post(this, MSG_DELETE);
}
}
diff --git a/webrtc/p2p/base/port.h b/webrtc/p2p/base/port.h
index fbda9ce..8e7a259 100644
--- a/webrtc/p2p/base/port.h
+++ b/webrtc/p2p/base/port.h
@@ -50,8 +50,12 @@
extern const char TCPTYPE_PASSIVE_STR[];
extern const char TCPTYPE_SIMOPEN_STR[];
-// The length of time we wait before timing out readability on a connection.
-const uint32 CONNECTION_READ_TIMEOUT = 30 * 1000; // 30 seconds
+// If a connection does not receive anything for this long, it is considered
+// dead.
+const uint32 DEAD_CONNECTION_RECEIVE_TIMEOUT = 30 * 1000; // 30 seconds.
+
+// The timeout duration when a connection does not receive anything.
+const uint32 WEAK_CONNECTION_RECEIVE_TIMEOUT = 2500; // 2.5 seconds
// The length of time we wait before timing out writability on a connection.
const uint32 CONNECTION_WRITE_TIMEOUT = 15 * 1000; // 15 seconds
@@ -417,15 +421,6 @@
// Returns the pair priority.
uint64 priority() const;
- enum ReadState {
- STATE_READ_INIT = 0, // we have yet to receive a ping
- STATE_READABLE = 1, // we have received pings recently
- STATE_READ_TIMEOUT = 2, // we haven't received pings in a while
- };
-
- ReadState read_state() const { return read_state_; }
- bool readable() const { return read_state_ == STATE_READABLE; }
-
enum WriteState {
STATE_WRITABLE = 0, // we have received ping responses recently
STATE_WRITE_UNRELIABLE = 1, // we have had a few ping failures
@@ -435,6 +430,7 @@
WriteState write_state() const { return write_state_; }
bool writable() const { return write_state_ == STATE_WRITABLE; }
+ bool receiving() const { return receiving_; }
// Determines whether the connection has finished connecting. This can only
// be false for TCP connections.
@@ -466,8 +462,8 @@
// Error if Send() returns < 0
virtual int GetError() = 0;
- sigslot::signal4<Connection*, const char*, size_t,
- const rtc::PacketTime&> SignalReadPacket;
+ sigslot::signal4<Connection*, const char*, size_t, const rtc::PacketTime&>
+ SignalReadPacket;
sigslot::signal1<Connection*> SignalReadyToSend;
@@ -495,6 +491,10 @@
remote_ice_mode_ = mode;
}
+ void set_receiving_timeout(uint32 receiving_timeout_ms) {
+ receiving_timeout_ = receiving_timeout_ms;
+ }
+
// Makes the connection go away.
void Destroy();
@@ -565,8 +565,8 @@
void OnConnectionRequestSent(ConnectionRequest* req);
// Changes the state and signals if necessary.
- void set_read_state(ReadState value);
void set_write_state(WriteState value);
+ void set_receiving(bool value);
void set_state(State state);
void set_connected(bool value);
@@ -578,8 +578,8 @@
Port* port_;
size_t local_candidate_index_;
Candidate remote_candidate_;
- ReadState read_state_;
WriteState write_state_;
+ bool receiving_;
bool connected_;
bool pruned_;
// By default |use_candidate_attr_| flag will be true,
@@ -611,6 +611,8 @@
bool reported_;
State state_;
+ // Time duration to switch from receiving to not receiving.
+ uint32 receiving_timeout_;
friend class Port;
friend class ConnectionRequest;
diff --git a/webrtc/p2p/base/port_unittest.cc b/webrtc/p2p/base/port_unittest.cc
index 453b77c..c4fa5f8 100644
--- a/webrtc/p2p/base/port_unittest.cc
+++ b/webrtc/p2p/base/port_unittest.cc
@@ -821,7 +821,7 @@
if (same_addr1 && same_addr2) {
// The new ping got back to the source.
- EXPECT_EQ(Connection::STATE_READABLE, ch1.conn()->read_state());
+ EXPECT_TRUE(ch1.conn()->receiving());
EXPECT_EQ(Connection::STATE_WRITABLE, ch2.conn()->write_state());
// First connection may not be writable if the first ping did not get
@@ -841,7 +841,7 @@
// able to get a ping from it. This gives us the real source address.
ch1.Ping();
EXPECT_TRUE_WAIT(!ch2.remote_address().IsNil(), kTimeout);
- EXPECT_EQ(Connection::STATE_READ_INIT, ch2.conn()->read_state());
+ EXPECT_FALSE(ch2.conn()->receiving());
EXPECT_TRUE(ch1.remote_address().IsNil());
// Pick up the actual address and establish the connection.
@@ -854,7 +854,7 @@
// The new ping came in, but from an unexpected address. This will happen
// when the destination NAT is symmetric.
EXPECT_FALSE(ch1.remote_address().IsNil());
- EXPECT_EQ(Connection::STATE_READ_INIT, ch1.conn()->read_state());
+ EXPECT_FALSE(ch1.conn()->receiving());
// Update our address and complete the connection.
ch1.AcceptConnection(GetCandidate(port2));
@@ -876,14 +876,14 @@
ASSERT_TRUE(ch1.conn() != NULL);
ASSERT_TRUE(ch2.conn() != NULL);
if (possible) {
- EXPECT_EQ(Connection::STATE_READABLE, ch1.conn()->read_state());
+ EXPECT_TRUE(ch1.conn()->receiving());
EXPECT_EQ(Connection::STATE_WRITABLE, ch1.conn()->write_state());
- EXPECT_EQ(Connection::STATE_READABLE, ch2.conn()->read_state());
+ EXPECT_TRUE(ch2.conn()->receiving());
EXPECT_EQ(Connection::STATE_WRITABLE, ch2.conn()->write_state());
} else {
- EXPECT_NE(Connection::STATE_READABLE, ch1.conn()->read_state());
+ EXPECT_FALSE(ch1.conn()->receiving());
EXPECT_NE(Connection::STATE_WRITABLE, ch1.conn()->write_state());
- EXPECT_NE(Connection::STATE_READABLE, ch2.conn()->read_state());
+ EXPECT_FALSE(ch2.conn()->receiving());
EXPECT_NE(Connection::STATE_WRITABLE, ch2.conn()->write_state());
}
@@ -1273,7 +1273,7 @@
// response.
lport->Reset();
lport->AddCandidateAddress(kLocalAddr2);
- // Creating a different connection as |conn| is in STATE_READABLE.
+ // Creating a different connection as |conn| is receiving.
Connection* conn1 = lport->CreateConnection(lport->Candidates()[1],
Port::ORIGIN_MESSAGE);
conn1->Ping(0);
diff --git a/webrtc/p2p/base/transport.cc b/webrtc/p2p/base/transport.cc
index 3322e38..d626ad3 100644
--- a/webrtc/p2p/base/transport.cc
+++ b/webrtc/p2p/base/transport.cc
@@ -25,7 +25,6 @@
enum {
MSG_ONSIGNALINGREADY = 1,
MSG_ONREMOTECANDIDATE,
- MSG_READSTATE,
MSG_WRITESTATE,
MSG_REQUESTSIGNALING,
MSG_CANDIDATEREADY,
@@ -238,7 +237,6 @@
if (local_description_ && remote_description_)
ApplyNegotiatedTransportDescription_w(impl, NULL);
- impl->SignalReadableState.connect(this, &Transport::OnChannelReadableState);
impl->SignalWritableState.connect(this, &Transport::OnChannelWritableState);
impl->SignalReceivingState.connect(this, &Transport::OnChannelReceivingState);
impl->SignalRequestSignaling.connect(
@@ -493,20 +491,6 @@
}
}
-void Transport::OnChannelReadableState(TransportChannel* channel) {
- ASSERT(worker_thread()->IsCurrent());
- signaling_thread()->Post(this, MSG_READSTATE, NULL);
-}
-
-void Transport::OnChannelReadableState_s() {
- ASSERT(signaling_thread()->IsCurrent());
- TransportState readable = GetTransportState_s(TRANSPORT_READABLE_STATE);
- if (readable_ != readable) {
- readable_ = readable;
- SignalReadableState(this);
- }
-}
-
void Transport::OnChannelWritableState(TransportChannel* channel) {
ASSERT(worker_thread()->IsCurrent());
signaling_thread()->Post(this, MSG_WRITESTATE, NULL);
@@ -547,9 +531,6 @@
for (const auto iter : channels_) {
bool b = false;
switch (state_type) {
- case TRANSPORT_READABLE_STATE:
- b = iter.second->readable();
- break;
case TRANSPORT_WRITABLE_STATE:
b = iter.second->writable();
break;
@@ -870,9 +851,6 @@
case MSG_CONNECTING:
OnConnecting_s();
break;
- case MSG_READSTATE:
- OnChannelReadableState_s();
- break;
case MSG_WRITESTATE:
OnChannelWritableState_s();
break;
diff --git a/webrtc/p2p/base/transport.h b/webrtc/p2p/base/transport.h
index 5539655..72a0895 100644
--- a/webrtc/p2p/base/transport.h
+++ b/webrtc/p2p/base/transport.h
@@ -54,7 +54,7 @@
typedef std::vector<Candidate> Candidates;
-// For "writable", "readable", and "receiving", we need to differentiate between
+// For "writable" and "receiving", we need to differentiate between
// none, all, and some.
enum TransportState {
TRANSPORT_STATE_NONE = 0,
@@ -63,10 +63,9 @@
};
// When checking transport state, we need to differentiate between
-// "readable", "writable", or "receiving" check.
+// "writable" or "receiving" check.
enum TransportStateType {
- TRANSPORT_READABLE_STATE = 0,
- TRANSPORT_WRITABLE_STATE,
+ TRANSPORT_WRITABLE_STATE = 0,
TRANSPORT_RECEIVING_STATE
};
@@ -76,7 +75,7 @@
ConnectionInfo()
: best_connection(false),
writable(false),
- readable(false),
+ receiving(false),
timeout(false),
new_connection(false),
rtt(0),
@@ -90,7 +89,7 @@
bool best_connection; // Is this the best connection we have?
bool writable; // Has this connection received a STUN response?
- bool readable; // Has this connection received a STUN request?
+ bool receiving; // Has this connection received anything?
bool timeout; // Has this connection timed out?
bool new_connection; // Is this a newly created connection?
size_t rtt; // The STUN RTT for this connection.
@@ -156,25 +155,16 @@
// Returns the port allocator object for this transport.
PortAllocator* port_allocator() { return allocator_; }
- // Returns the readable and states of this manager. These bits are the ORs
+ // Returns the states of this manager. These bits are the ORs
// of the corresponding bits on the managed channels. Each time one of these
// states changes, a signal is raised.
- // TODO: Replace uses of readable() and writable() with
- // any_channels_readable() and any_channels_writable().
- bool readable() const { return any_channels_readable(); }
+ // TODO(honghaiz): Replace uses of writable() with any_channels_writable().
bool writable() const { return any_channels_writable(); }
bool was_writable() const { return was_writable_; }
- bool any_channels_readable() const {
- return (readable_ == TRANSPORT_STATE_SOME ||
- readable_ == TRANSPORT_STATE_ALL);
- }
bool any_channels_writable() const {
return (writable_ == TRANSPORT_STATE_SOME ||
writable_ == TRANSPORT_STATE_ALL);
}
- bool all_channels_readable() const {
- return (readable_ == TRANSPORT_STATE_ALL);
- }
bool all_channels_writable() const {
return (writable_ == TRANSPORT_STATE_ALL);
}
@@ -183,7 +173,6 @@
receiving_ == TRANSPORT_STATE_ALL);
}
- sigslot::signal1<Transport*> SignalReadableState;
sigslot::signal1<Transport*> SignalWritableState;
sigslot::signal1<Transport*> SignalReceivingState;
sigslot::signal1<Transport*> SignalCompleted;
@@ -374,8 +363,7 @@
// Candidate component => ChannelMapEntry
typedef std::map<int, ChannelMapEntry> ChannelMap;
- // Called when the state of a channel changes.
- void OnChannelReadableState(TransportChannel* channel);
+ // Called when the write state of a channel changes.
void OnChannelWritableState(TransportChannel* channel);
// Called when the receiving state of a channel changes.
@@ -409,7 +397,6 @@
void ResetChannels_w();
void DestroyAllChannels_w();
void OnRemoteCandidate_w(const Candidate& candidate);
- void OnChannelReadableState_s();
void OnChannelWritableState_s();
void OnChannelReceivingState_s();
void OnChannelRequestSignaling_s();
diff --git a/webrtc/p2p/base/transportchannel.cc b/webrtc/p2p/base/transportchannel.cc
index 5fb0eb4..5d5a7c9 100644
--- a/webrtc/p2p/base/transportchannel.cc
+++ b/webrtc/p2p/base/transportchannel.cc
@@ -15,22 +15,14 @@
namespace cricket {
std::string TransportChannel::ToString() const {
- const char READABLE_ABBREV[2] = { '_', 'R' };
+ const char RECEIVING_ABBREV[2] = { '_', 'R' };
const char WRITABLE_ABBREV[2] = { '_', 'W' };
std::stringstream ss;
- ss << "Channel[" << content_name_
- << "|" << component_
- << "|" << READABLE_ABBREV[readable_] << WRITABLE_ABBREV[writable_] << "]";
+ ss << "Channel[" << content_name_ << "|" << component_ << "|"
+ << RECEIVING_ABBREV[receiving_] << WRITABLE_ABBREV[writable_] << "]";
return ss.str();
}
-void TransportChannel::set_readable(bool readable) {
- if (readable_ != readable) {
- readable_ = readable;
- SignalReadableState(this);
- }
-}
-
void TransportChannel::set_receiving(bool receiving) {
if (receiving_ == receiving) {
return;
diff --git a/webrtc/p2p/base/transportchannel.h b/webrtc/p2p/base/transportchannel.h
index f492e4e..60d1ed0 100644
--- a/webrtc/p2p/base/transportchannel.h
+++ b/webrtc/p2p/base/transportchannel.h
@@ -46,7 +46,8 @@
explicit TransportChannel(const std::string& content_name, int component)
: content_name_(content_name),
component_(component),
- readable_(false), writable_(false), receiving_(false) {}
+ writable_(false),
+ receiving_(false) {}
virtual ~TransportChannel() {}
// TODO(guoweis) - Make this pure virtual once all subclasses of
@@ -62,13 +63,10 @@
const std::string& content_name() const { return content_name_; }
int component() const { return component_; }
- // Returns the readable and states of this channel. Each time one of these
- // states changes, a signal is raised. These states are aggregated by the
- // TransportManager.
- bool readable() const { return readable_; }
+ // Returns the states of this channel. Each time one of these states changes,
+ // a signal is raised. These states are aggregated by the TransportManager.
bool writable() const { return writable_; }
bool receiving() const { return receiving_; }
- sigslot::signal1<TransportChannel*> SignalReadableState;
sigslot::signal1<TransportChannel*> SignalWritableState;
// Emitted when the TransportChannel's ability to send has changed.
sigslot::signal1<TransportChannel*> SignalReadyToSend;
@@ -139,8 +137,6 @@
std::string ToString() const;
protected:
- // Sets the readable state, signaling if necessary.
- void set_readable(bool readable);
// Sets the writable state, signaling if necessary.
void set_writable(bool writable);
@@ -153,7 +149,6 @@
// Used mostly for debugging.
std::string content_name_;
int component_;
- bool readable_;
bool writable_;
bool receiving_;
diff --git a/webrtc/p2p/base/transportchannelproxy.cc b/webrtc/p2p/base/transportchannelproxy.cc
index f7946dd..74d1e1d 100644
--- a/webrtc/p2p/base/transportchannelproxy.cc
+++ b/webrtc/p2p/base/transportchannelproxy.cc
@@ -55,10 +55,10 @@
impl_ = impl;
if (impl_) {
- impl_->SignalReadableState.connect(
- this, &TransportChannelProxy::OnReadableState);
impl_->SignalWritableState.connect(
this, &TransportChannelProxy::OnWritableState);
+ impl_->SignalReceivingState.connect(
+ this, &TransportChannelProxy::OnReceivingState);
impl_->SignalReadPacket.connect(
this, &TransportChannelProxy::OnReadPacket);
impl_->SignalReadyToSend.connect(
@@ -229,18 +229,18 @@
return impl_->GetIceRole();
}
-void TransportChannelProxy::OnReadableState(TransportChannel* channel) {
- ASSERT(rtc::Thread::Current() == worker_thread_);
- ASSERT(channel == impl_);
- set_readable(impl_->readable());
- // Note: SignalReadableState fired by set_readable.
-}
-
void TransportChannelProxy::OnWritableState(TransportChannel* channel) {
ASSERT(rtc::Thread::Current() == worker_thread_);
ASSERT(channel == impl_);
set_writable(impl_->writable());
- // Note: SignalWritableState fired by set_readable.
+ // Note: SignalWritableState fired by set_writable.
+}
+
+void TransportChannelProxy::OnReceivingState(TransportChannel* channel) {
+ ASSERT(rtc::Thread::Current() == worker_thread_);
+ ASSERT(channel == impl_);
+ set_receiving(impl_->receiving());
+ // Note: SignalReceivingState fired by set_receiving.
}
void TransportChannelProxy::OnReadPacket(
@@ -267,9 +267,9 @@
void TransportChannelProxy::OnMessage(rtc::Message* msg) {
ASSERT(rtc::Thread::Current() == worker_thread_);
if (msg->message_id == MSG_UPDATESTATE) {
- // If impl_ is already readable or writable, push up those signals.
- set_readable(impl_ ? impl_->readable() : false);
- set_writable(impl_ ? impl_->writable() : false);
+ // If impl_ is already receiving or writable, push up those signals.
+ set_writable(impl_ ? impl_->writable() : false);
+ set_receiving(impl_ ? impl_->receiving() : false);
}
}
diff --git a/webrtc/p2p/base/transportchannelproxy.h b/webrtc/p2p/base/transportchannelproxy.h
index f10f507..80ee20a 100644
--- a/webrtc/p2p/base/transportchannelproxy.h
+++ b/webrtc/p2p/base/transportchannelproxy.h
@@ -72,7 +72,7 @@
private:
// Catch signals from the implementation channel. These just forward to the
// client (after updating our state to match).
- void OnReadableState(TransportChannel* channel);
+ void OnReceivingState(TransportChannel* channel);
void OnWritableState(TransportChannel* channel);
void OnReadPacket(TransportChannel* channel, const char* data, size_t size,
const rtc::PacketTime& packet_time, int flags);
diff --git a/webrtc/p2p/base/turnport_unittest.cc b/webrtc/p2p/base/turnport_unittest.cc
index a90ea03..724485d 100644
--- a/webrtc/p2p/base/turnport_unittest.cc
+++ b/webrtc/p2p/base/turnport_unittest.cc
@@ -364,7 +364,7 @@
conn1->Ping(0);
WAIT(!turn_unknown_address_, kTimeout);
EXPECT_FALSE(turn_unknown_address_);
- EXPECT_EQ(Connection::STATE_READ_INIT, conn1->read_state());
+ EXPECT_FALSE(conn1->receiving());
EXPECT_EQ(Connection::STATE_WRITE_INIT, conn1->write_state());
// Send ping from TURN to UDP.
@@ -375,14 +375,14 @@
conn2->Ping(0);
EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, conn2->write_state(), kTimeout);
- EXPECT_EQ(Connection::STATE_READABLE, conn1->read_state());
- EXPECT_EQ(Connection::STATE_READ_INIT, conn2->read_state());
+ EXPECT_TRUE(conn1->receiving());
+ EXPECT_TRUE(conn2->receiving());
EXPECT_EQ(Connection::STATE_WRITE_INIT, conn1->write_state());
// Send another ping from UDP to TURN.
conn1->Ping(0);
EXPECT_EQ_WAIT(Connection::STATE_WRITABLE, conn1->write_state(), kTimeout);
- EXPECT_EQ(Connection::STATE_READABLE, conn2->read_state());
+ EXPECT_TRUE(conn2->receiving());
}
void TestTurnSendData() {