Implements TODOs for webrtc::datachannel state management when the SCTP association is congested. Adds missing state variables for each step in the transitions between DataChannelInterface::DataStates (kConnecting, kOpen, etc.), and uses them.
BUG=https://code.google.com/p/chromium/issues/detail?id=474650
R=jiayl@webrtc.org, pthatcher@webrtc.org
Review URL: https://webrtc-codereview.appspot.com/44299004
Cr-Commit-Position: refs/heads/master@{#9331}
diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc
index 1897b73..559eec5 100644
--- a/talk/app/webrtc/datachannel.cc
+++ b/talk/app/webrtc/datachannel.cc
@@ -109,24 +109,26 @@
state_(kConnecting),
data_channel_type_(dct),
provider_(provider),
- waiting_for_open_ack_(false),
- was_ever_writable_(false),
+ handshake_state_(kHandshakeInit),
connected_to_provider_(false),
send_ssrc_set_(false),
receive_ssrc_set_(false),
+ writable_(false),
send_ssrc_(0),
receive_ssrc_(0) {
}
bool DataChannel::Init(const InternalDataChannelInit& config) {
- if (data_channel_type_ == cricket::DCT_RTP &&
- (config.reliable ||
- config.id != -1 ||
- config.maxRetransmits != -1 ||
- config.maxRetransmitTime != -1)) {
- LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
- << "invalid DataChannelInit.";
- return false;
+ if (data_channel_type_ == cricket::DCT_RTP) {
+ if (config.reliable ||
+ config.id != -1 ||
+ config.maxRetransmits != -1 ||
+ config.maxRetransmitTime != -1) {
+ LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
+ << "invalid DataChannelInit.";
+ return false;
+ }
+ handshake_state_ = kHandshakeReady;
} else if (data_channel_type_ == cricket::DCT_SCTP) {
if (config.id < -1 ||
config.maxRetransmits < -1 ||
@@ -142,6 +144,18 @@
}
config_ = config;
+ switch (config_.open_handshake_role) {
+ case webrtc::InternalDataChannelInit::kNone: // pre-negotiated
+ handshake_state_ = kHandshakeReady;
+ break;
+ case webrtc::InternalDataChannelInit::kOpener:
+ handshake_state_ = kHandshakeShouldSendOpen;
+ break;
+ case webrtc::InternalDataChannelInit::kAcker:
+ handshake_state_ = kHandshakeShouldSendAck;
+ break;
+ };
+
// Try to connect to the transport in case the transport channel already
// exists.
OnTransportChannelCreated();
@@ -298,7 +312,7 @@
if (params.type == cricket::DMT_CONTROL) {
ASSERT(data_channel_type_ == cricket::DCT_SCTP);
- if (!waiting_for_open_ack_) {
+ if (handshake_state_ != kHandshakeWaitingForAck) {
// Ignore it if we are not expecting an ACK message.
LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, "
<< "sid = " << params.ssrc;
@@ -306,7 +320,7 @@
}
if (ParseDataChannelOpenAckMessage(payload)) {
// We can send unordered as soon as we receive the ACK message.
- waiting_for_open_ack_ = false;
+ handshake_state_ = kHandshakeReady;
LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
<< params.ssrc;
} else {
@@ -323,11 +337,13 @@
// We can send unordered as soon as we receive any DATA message since the
// remote side must have received the OPEN (and old clients do not send
// OPEN_ACK).
- waiting_for_open_ack_ = false;
+ if (handshake_state_ == kHandshakeWaitingForAck) {
+ handshake_state_ = kHandshakeReady;
+ }
bool binary = (params.type == cricket::DMT_BINARY);
rtc::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
- if (was_ever_writable_ && observer_) {
+ if (state_ == kOpen && observer_) {
observer_->OnMessage(*buffer.get());
} else {
if (queued_received_data_.byte_count() + payload.size() >
@@ -346,38 +362,14 @@
}
void DataChannel::OnChannelReady(bool writable) {
+ writable_ = writable;
if (!writable) {
return;
}
- // Update the readyState and send the queued control message if the channel
- // is writable for the first time; otherwise it means the channel was blocked
- // for sending and now unblocked, so send the queued data now.
- if (!was_ever_writable_) {
- was_ever_writable_ = true;
- if (data_channel_type_ == cricket::DCT_SCTP) {
- rtc::Buffer payload;
-
- if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
- WriteDataChannelOpenMessage(label_, config_, &payload);
- SendControlMessage(payload);
- } else if (config_.open_handshake_role ==
- InternalDataChannelInit::kAcker) {
- WriteDataChannelOpenAckMessage(&payload);
- SendControlMessage(payload);
- }
- }
-
- UpdateState();
- ASSERT(queued_send_data_.Empty());
- } else if (state_ == kOpen) {
- // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
- // that the readyState is open. According to the standard, the channel
- // should not become open before the OPEN message is sent.
- SendQueuedControlMessages();
-
- SendQueuedDataMessages();
- }
+ SendQueuedControlMessages();
+ SendQueuedDataMessages();
+ UpdateState();
}
void DataChannel::DoClose() {
@@ -391,20 +383,34 @@
}
void DataChannel::UpdateState() {
+ // UpdateState determines what to do from a few state variables. Include
+ // all conditions required for each state transition here for
+ // clarity. OnChannelReady(true) will send any queued data and then invoke
+ // UpdateState().
switch (state_) {
case kConnecting: {
if (send_ssrc_set_ == receive_ssrc_set_) {
if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) {
connected_to_provider_ = provider_->ConnectDataChannel(this);
}
- if (was_ever_writable_) {
- // TODO(jiayl): Do not transition to kOpen if we failed to send the
- // OPEN message.
- SendQueuedControlMessages();
- SetState(kOpen);
- // If we have received buffers before the channel got writable.
- // Deliver them now.
- DeliverQueuedReceivedData();
+ if (connected_to_provider_) {
+ if (handshake_state_ == kHandshakeShouldSendOpen) {
+ rtc::Buffer payload;
+ WriteDataChannelOpenMessage(label_, config_, &payload);
+ SendControlMessage(payload);
+ } else if (handshake_state_ == kHandshakeShouldSendAck) {
+ rtc::Buffer payload;
+ WriteDataChannelOpenAckMessage(&payload);
+ SendControlMessage(payload);
+ }
+ if (writable_ &&
+ (handshake_state_ == kHandshakeReady ||
+ handshake_state_ == kHandshakeWaitingForAck)) {
+ SetState(kOpen);
+ // If we have received buffers before the channel got writable.
+ // Deliver them now.
+ DeliverQueuedReceivedData();
+ }
}
}
break;
@@ -413,10 +419,14 @@
break;
}
case kClosing: {
- DisconnectFromTransport();
+ if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
+ if (connected_to_provider_) {
+ DisconnectFromProvider();
+ }
- if (!send_ssrc_set_ && !receive_ssrc_set_) {
- SetState(kClosed);
+ if (!connected_to_provider_ && !send_ssrc_set_ && !receive_ssrc_set_) {
+ SetState(kClosed);
+ }
}
break;
}
@@ -435,7 +445,7 @@
}
}
-void DataChannel::DisconnectFromTransport() {
+void DataChannel::DisconnectFromProvider() {
if (!connected_to_provider_)
return;
@@ -448,7 +458,7 @@
}
void DataChannel::DeliverQueuedReceivedData() {
- if (!was_ever_writable_ || !observer_) {
+ if (!observer_) {
return;
}
@@ -460,7 +470,11 @@
}
void DataChannel::SendQueuedDataMessages() {
- ASSERT(was_ever_writable_ && state_ == kOpen);
+ if (queued_send_data_.Empty()) {
+ return;
+ }
+
+ ASSERT(state_ == kOpen || state_ == kClosing);
while (!queued_send_data_.Empty()) {
DataBuffer* buffer = queued_send_data_.Front();
@@ -479,8 +493,8 @@
if (data_channel_type_ == cricket::DCT_SCTP) {
send_params.ordered = config_.ordered;
- // Send as ordered if it is waiting for the OPEN_ACK message.
- if (waiting_for_open_ack_ && !config_.ordered) {
+ // Send as ordered if it is still going through OPEN/ACK signaling.
+ if (handshake_state_ != kHandshakeReady && !config_.ordered) {
send_params.ordered = true;
LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel "
<< "because the OPEN_ACK message has not been received.";
@@ -529,8 +543,6 @@
}
void DataChannel::SendQueuedControlMessages() {
- ASSERT(was_ever_writable_);
-
PacketQueue control_packets;
control_packets.Swap(&queued_control_data_);
@@ -546,16 +558,18 @@
}
bool DataChannel::SendControlMessage(const rtc::Buffer& buffer) {
- bool is_open_message =
- (config_.open_handshake_role == InternalDataChannelInit::kOpener);
+ bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
- was_ever_writable_ &&
+ writable_ &&
config_.id >= 0 &&
(!is_open_message || !config_.negotiated));
cricket::SendDataParams send_params;
send_params.ssrc = config_.id;
+ // Send data as ordered before we receive any message from the remote peer to
+ // make sure the remote peer will not receive any data before it receives the
+ // OPEN message.
send_params.ordered = config_.ordered || is_open_message;
send_params.type = cricket::DMT_CONTROL;
@@ -564,11 +578,10 @@
if (retval) {
LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id;
- if (is_open_message) {
- // Send data as ordered before we receive any message from the remote peer
- // to make sure the remote peer will not receive any data before it
- // receives the OPEN message.
- waiting_for_open_ack_ = true;
+ if (handshake_state_ == kHandshakeShouldSendAck) {
+ handshake_state_ = kHandshakeReady;
+ } else if (handshake_state_ == kHandshakeShouldSendOpen) {
+ handshake_state_ = kHandshakeWaitingForAck;
}
} else if (send_result == cricket::SDR_BLOCK) {
QueueControlMessage(buffer);
diff --git a/talk/app/webrtc/datachannel.h b/talk/app/webrtc/datachannel.h
index fe8fac1..8e58d06 100644
--- a/talk/app/webrtc/datachannel.h
+++ b/talk/app/webrtc/datachannel.h
@@ -204,11 +204,20 @@
size_t byte_count_;
};
+ // The OPEN(_ACK) signaling state.
+ enum HandshakeState {
+ kHandshakeInit,
+ kHandshakeShouldSendOpen,
+ kHandshakeShouldSendAck,
+ kHandshakeWaitingForAck,
+ kHandshakeReady
+ };
+
bool Init(const InternalDataChannelInit& config);
void DoClose();
void UpdateState();
void SetState(DataState state);
- void DisconnectFromTransport();
+ void DisconnectFromProvider();
void DeliverQueuedReceivedData();
@@ -226,11 +235,11 @@
DataState state_;
cricket::DataChannelType data_channel_type_;
DataChannelProviderInterface* provider_;
- bool waiting_for_open_ack_;
- bool was_ever_writable_;
+ HandshakeState handshake_state_;
bool connected_to_provider_;
bool send_ssrc_set_;
bool receive_ssrc_set_;
+ bool writable_;
uint32 send_ssrc_;
uint32 receive_ssrc_;
// Control messages that always have to get sent out before any queued
diff --git a/talk/app/webrtc/datachannel_unittest.cc b/talk/app/webrtc/datachannel_unittest.cc
index ab5dbe9..bc4f81c 100644
--- a/talk/app/webrtc/datachannel_unittest.cc
+++ b/talk/app/webrtc/datachannel_unittest.cc
@@ -269,6 +269,41 @@
EXPECT_FALSE(provider_.last_send_data_params().ordered);
}
+// Tests that the channel can't open until it's successfully sent the OPEN
+// message.
+TEST_F(SctpDataChannelTest, OpenWaitsForOpenMesssage) {
+ webrtc::DataBuffer buffer("foo");
+
+ provider_.set_send_blocked(true);
+ SetChannelReady();
+ EXPECT_EQ(webrtc::DataChannelInterface::kConnecting,
+ webrtc_data_channel_->state());
+ provider_.set_send_blocked(false);
+ EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen,
+ webrtc_data_channel_->state(), 1000);
+ EXPECT_EQ(cricket::DMT_CONTROL, provider_.last_send_data_params().type);
+}
+
+// Tests that close first makes sure all queued data gets sent.
+TEST_F(SctpDataChannelTest, QueuedCloseFlushes) {
+ webrtc::DataBuffer buffer("foo");
+
+ provider_.set_send_blocked(true);
+ SetChannelReady();
+ EXPECT_EQ(webrtc::DataChannelInterface::kConnecting,
+ webrtc_data_channel_->state());
+ provider_.set_send_blocked(false);
+ EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen,
+ webrtc_data_channel_->state(), 1000);
+ provider_.set_send_blocked(true);
+ webrtc_data_channel_->Send(buffer);
+ webrtc_data_channel_->Close();
+ provider_.set_send_blocked(false);
+ EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed,
+ webrtc_data_channel_->state(), 1000);
+ EXPECT_EQ(cricket::DMT_TEXT, provider_.last_send_data_params().type);
+}
+
// Tests that messages are sent with the right ssrc.
TEST_F(SctpDataChannelTest, SendDataSsrc) {
webrtc_data_channel_->SetSctpSid(1);
@@ -369,8 +404,9 @@
EXPECT_TRUE(webrtc_data_channel_->Send(packet));
}
- EXPECT_EQ(webrtc::DataChannelInterface::kClosed,
- webrtc_data_channel_->state());
+ EXPECT_TRUE(
+ webrtc::DataChannelInterface::kClosed == webrtc_data_channel_->state() ||
+ webrtc::DataChannelInterface::kClosing == webrtc_data_channel_->state());
}
// Tests that the DataChannel is closed on transport errors.
diff --git a/talk/app/webrtc/test/fakedatachannelprovider.h b/talk/app/webrtc/test/fakedatachannelprovider.h
index bf64a94..eb86873 100644
--- a/talk/app/webrtc/test/fakedatachannelprovider.h
+++ b/talk/app/webrtc/test/fakedatachannelprovider.h
@@ -91,11 +91,15 @@
void set_send_blocked(bool blocked) {
send_blocked_ = blocked;
if (!blocked) {
- std::set<webrtc::DataChannel*>::iterator it;
- for (it = connected_channels_.begin();
- it != connected_channels_.end();
- ++it) {
- (*it)->OnChannelReady(true);
+ // Take a snapshot of the connected channels and check to see whether
+ // each value is still in connected_channels_ before calling
+ // OnChannelReady(). This avoids problems where the set gets modified
+ // in response to OnChannelReady().
+ for (webrtc::DataChannel *ch : std::set<webrtc::DataChannel*>(
+ connected_channels_.begin(), connected_channels_.end())) {
+ if (connected_channels_.count(ch)) {
+ ch->OnChannelReady(true);
+ }
}
}
}