Implements TODOs for webrtc::datachannel state management when the SCTP association is congested.  Adds missing state variables for each step in the transitions between DataChannelInterface::DataStates (kConnecting, kOpen, etc.), and uses them.

BUG=https://code.google.com/p/chromium/issues/detail?id=474650
R=jiayl@webrtc.org, pthatcher@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/44299004

Cr-Commit-Position: refs/heads/master@{#9331}
diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc
index 1897b73..559eec5 100644
--- a/talk/app/webrtc/datachannel.cc
+++ b/talk/app/webrtc/datachannel.cc
@@ -109,24 +109,26 @@
       state_(kConnecting),
       data_channel_type_(dct),
       provider_(provider),
-      waiting_for_open_ack_(false),
-      was_ever_writable_(false),
+      handshake_state_(kHandshakeInit),
       connected_to_provider_(false),
       send_ssrc_set_(false),
       receive_ssrc_set_(false),
+      writable_(false),
       send_ssrc_(0),
       receive_ssrc_(0) {
 }
 
 bool DataChannel::Init(const InternalDataChannelInit& config) {
-  if (data_channel_type_ == cricket::DCT_RTP &&
-      (config.reliable ||
-       config.id != -1 ||
-       config.maxRetransmits != -1 ||
-       config.maxRetransmitTime != -1)) {
-    LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
-                  << "invalid DataChannelInit.";
-    return false;
+  if (data_channel_type_ == cricket::DCT_RTP) {
+    if (config.reliable ||
+        config.id != -1 ||
+        config.maxRetransmits != -1 ||
+        config.maxRetransmitTime != -1) {
+      LOG(LS_ERROR) << "Failed to initialize the RTP data channel due to "
+                    << "invalid DataChannelInit.";
+      return false;
+    }
+    handshake_state_ = kHandshakeReady;
   } else if (data_channel_type_ == cricket::DCT_SCTP) {
     if (config.id < -1 ||
         config.maxRetransmits < -1 ||
@@ -142,6 +144,18 @@
     }
     config_ = config;
 
+    switch (config_.open_handshake_role) {
+    case webrtc::InternalDataChannelInit::kNone:  // pre-negotiated
+      handshake_state_ = kHandshakeReady;
+      break;
+    case webrtc::InternalDataChannelInit::kOpener:
+      handshake_state_ = kHandshakeShouldSendOpen;
+      break;
+    case webrtc::InternalDataChannelInit::kAcker:
+      handshake_state_ = kHandshakeShouldSendAck;
+      break;
+    };
+
     // Try to connect to the transport in case the transport channel already
     // exists.
     OnTransportChannelCreated();
@@ -298,7 +312,7 @@
 
   if (params.type == cricket::DMT_CONTROL) {
     ASSERT(data_channel_type_ == cricket::DCT_SCTP);
-    if (!waiting_for_open_ack_) {
+    if (handshake_state_ != kHandshakeWaitingForAck) {
       // Ignore it if we are not expecting an ACK message.
       LOG(LS_WARNING) << "DataChannel received unexpected CONTROL message, "
                       << "sid = " << params.ssrc;
@@ -306,7 +320,7 @@
     }
     if (ParseDataChannelOpenAckMessage(payload)) {
       // We can send unordered as soon as we receive the ACK message.
-      waiting_for_open_ack_ = false;
+      handshake_state_ = kHandshakeReady;
       LOG(LS_INFO) << "DataChannel received OPEN_ACK message, sid = "
                    << params.ssrc;
     } else {
@@ -323,11 +337,13 @@
   // We can send unordered as soon as we receive any DATA message since the
   // remote side must have received the OPEN (and old clients do not send
   // OPEN_ACK).
-  waiting_for_open_ack_ = false;
+  if (handshake_state_ == kHandshakeWaitingForAck) {
+    handshake_state_ = kHandshakeReady;
+  }
 
   bool binary = (params.type == cricket::DMT_BINARY);
   rtc::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
-  if (was_ever_writable_ && observer_) {
+  if (state_ == kOpen && observer_) {
     observer_->OnMessage(*buffer.get());
   } else {
     if (queued_received_data_.byte_count() + payload.size() >
@@ -346,38 +362,14 @@
 }
 
 void DataChannel::OnChannelReady(bool writable) {
+  writable_ = writable;
   if (!writable) {
     return;
   }
-  // Update the readyState and send the queued control message if the channel
-  // is writable for the first time; otherwise it means the channel was blocked
-  // for sending and now unblocked, so send the queued data now.
-  if (!was_ever_writable_) {
-    was_ever_writable_ = true;
 
-    if (data_channel_type_ == cricket::DCT_SCTP) {
-      rtc::Buffer payload;
-
-      if (config_.open_handshake_role == InternalDataChannelInit::kOpener) {
-        WriteDataChannelOpenMessage(label_, config_, &payload);
-        SendControlMessage(payload);
-      } else if (config_.open_handshake_role ==
-                     InternalDataChannelInit::kAcker) {
-        WriteDataChannelOpenAckMessage(&payload);
-        SendControlMessage(payload);
-      }
-    }
-
-    UpdateState();
-    ASSERT(queued_send_data_.Empty());
-  } else if (state_ == kOpen) {
-    // TODO(jiayl): Sending OPEN message here contradicts with the pre-condition
-    // that the readyState is open. According to the standard, the channel
-    // should not become open before the OPEN message is sent.
-    SendQueuedControlMessages();
-
-    SendQueuedDataMessages();
-  }
+  SendQueuedControlMessages();
+  SendQueuedDataMessages();
+  UpdateState();
 }
 
 void DataChannel::DoClose() {
@@ -391,20 +383,34 @@
 }
 
 void DataChannel::UpdateState() {
+  // UpdateState determines what to do from a few state variables.  Include
+  // all conditions required for each state transition here for
+  // clarity. OnChannelReady(true) will send any queued data and then invoke
+  // UpdateState().
   switch (state_) {
     case kConnecting: {
       if (send_ssrc_set_ == receive_ssrc_set_) {
         if (data_channel_type_ == cricket::DCT_RTP && !connected_to_provider_) {
           connected_to_provider_ = provider_->ConnectDataChannel(this);
         }
-        if (was_ever_writable_) {
-          // TODO(jiayl): Do not transition to kOpen if we failed to send the
-          // OPEN message.
-          SendQueuedControlMessages();
-          SetState(kOpen);
-          // If we have received buffers before the channel got writable.
-          // Deliver them now.
-          DeliverQueuedReceivedData();
+        if (connected_to_provider_) {
+          if (handshake_state_ == kHandshakeShouldSendOpen) {
+            rtc::Buffer payload;
+            WriteDataChannelOpenMessage(label_, config_, &payload);
+            SendControlMessage(payload);
+          } else if (handshake_state_ == kHandshakeShouldSendAck) {
+            rtc::Buffer payload;
+            WriteDataChannelOpenAckMessage(&payload);
+            SendControlMessage(payload);
+          }
+          if (writable_ &&
+              (handshake_state_ == kHandshakeReady ||
+               handshake_state_ == kHandshakeWaitingForAck)) {
+            SetState(kOpen);
+            // If we have received buffers before the channel got writable.
+            // Deliver them now.
+            DeliverQueuedReceivedData();
+          }
         }
       }
       break;
@@ -413,10 +419,14 @@
       break;
     }
     case kClosing: {
-      DisconnectFromTransport();
+      if (queued_send_data_.Empty() && queued_control_data_.Empty()) {
+        if (connected_to_provider_) {
+          DisconnectFromProvider();
+        }
 
-      if (!send_ssrc_set_ && !receive_ssrc_set_) {
-        SetState(kClosed);
+        if (!connected_to_provider_ && !send_ssrc_set_ && !receive_ssrc_set_) {
+          SetState(kClosed);
+        }
       }
       break;
     }
@@ -435,7 +445,7 @@
   }
 }
 
-void DataChannel::DisconnectFromTransport() {
+void DataChannel::DisconnectFromProvider() {
   if (!connected_to_provider_)
     return;
 
@@ -448,7 +458,7 @@
 }
 
 void DataChannel::DeliverQueuedReceivedData() {
-  if (!was_ever_writable_ || !observer_) {
+  if (!observer_) {
     return;
   }
 
@@ -460,7 +470,11 @@
 }
 
 void DataChannel::SendQueuedDataMessages() {
-  ASSERT(was_ever_writable_ && state_ == kOpen);
+  if (queued_send_data_.Empty()) {
+    return;
+  }
+
+  ASSERT(state_ == kOpen || state_ == kClosing);
 
   while (!queued_send_data_.Empty()) {
     DataBuffer* buffer = queued_send_data_.Front();
@@ -479,8 +493,8 @@
 
   if (data_channel_type_ == cricket::DCT_SCTP) {
     send_params.ordered = config_.ordered;
-    // Send as ordered if it is waiting for the OPEN_ACK message.
-    if (waiting_for_open_ack_ && !config_.ordered) {
+    // Send as ordered if it is still going through OPEN/ACK signaling.
+    if (handshake_state_ != kHandshakeReady && !config_.ordered) {
       send_params.ordered = true;
       LOG(LS_VERBOSE) << "Sending data as ordered for unordered DataChannel "
                       << "because the OPEN_ACK message has not been received.";
@@ -529,8 +543,6 @@
 }
 
 void DataChannel::SendQueuedControlMessages() {
-  ASSERT(was_ever_writable_);
-
   PacketQueue control_packets;
   control_packets.Swap(&queued_control_data_);
 
@@ -546,16 +558,18 @@
 }
 
 bool DataChannel::SendControlMessage(const rtc::Buffer& buffer) {
-  bool is_open_message =
-      (config_.open_handshake_role == InternalDataChannelInit::kOpener);
+  bool is_open_message = handshake_state_ == kHandshakeShouldSendOpen;
 
   ASSERT(data_channel_type_ == cricket::DCT_SCTP &&
-         was_ever_writable_ &&
+         writable_ &&
          config_.id >= 0 &&
          (!is_open_message || !config_.negotiated));
 
   cricket::SendDataParams send_params;
   send_params.ssrc = config_.id;
+  // Send data as ordered before we receive any message from the remote peer to
+  // make sure the remote peer will not receive any data before it receives the
+  // OPEN message.
   send_params.ordered = config_.ordered || is_open_message;
   send_params.type = cricket::DMT_CONTROL;
 
@@ -564,11 +578,10 @@
   if (retval) {
     LOG(LS_INFO) << "Sent CONTROL message on channel " << config_.id;
 
-    if (is_open_message) {
-      // Send data as ordered before we receive any message from the remote peer
-      // to make sure the remote peer will not receive any data before it
-      // receives the OPEN message.
-      waiting_for_open_ack_ = true;
+    if (handshake_state_ == kHandshakeShouldSendAck) {
+      handshake_state_ = kHandshakeReady;
+    } else if (handshake_state_ == kHandshakeShouldSendOpen) {
+      handshake_state_ = kHandshakeWaitingForAck;
     }
   } else if (send_result == cricket::SDR_BLOCK) {
     QueueControlMessage(buffer);
diff --git a/talk/app/webrtc/datachannel.h b/talk/app/webrtc/datachannel.h
index fe8fac1..8e58d06 100644
--- a/talk/app/webrtc/datachannel.h
+++ b/talk/app/webrtc/datachannel.h
@@ -204,11 +204,20 @@
     size_t byte_count_;
   };
 
+  // The OPEN(_ACK) signaling state.
+  enum HandshakeState {
+    kHandshakeInit,
+    kHandshakeShouldSendOpen,
+    kHandshakeShouldSendAck,
+    kHandshakeWaitingForAck,
+    kHandshakeReady
+  };
+
   bool Init(const InternalDataChannelInit& config);
   void DoClose();
   void UpdateState();
   void SetState(DataState state);
-  void DisconnectFromTransport();
+  void DisconnectFromProvider();
 
   void DeliverQueuedReceivedData();
 
@@ -226,11 +235,11 @@
   DataState state_;
   cricket::DataChannelType data_channel_type_;
   DataChannelProviderInterface* provider_;
-  bool waiting_for_open_ack_;
-  bool was_ever_writable_;
+  HandshakeState handshake_state_;
   bool connected_to_provider_;
   bool send_ssrc_set_;
   bool receive_ssrc_set_;
+  bool writable_;
   uint32 send_ssrc_;
   uint32 receive_ssrc_;
   // Control messages that always have to get sent out before any queued
diff --git a/talk/app/webrtc/datachannel_unittest.cc b/talk/app/webrtc/datachannel_unittest.cc
index ab5dbe9..bc4f81c 100644
--- a/talk/app/webrtc/datachannel_unittest.cc
+++ b/talk/app/webrtc/datachannel_unittest.cc
@@ -269,6 +269,41 @@
   EXPECT_FALSE(provider_.last_send_data_params().ordered);
 }
 
+// Tests that the channel can't open until it's successfully sent the OPEN
+// message.
+TEST_F(SctpDataChannelTest, OpenWaitsForOpenMesssage) {
+  webrtc::DataBuffer buffer("foo");
+
+  provider_.set_send_blocked(true);
+  SetChannelReady();
+  EXPECT_EQ(webrtc::DataChannelInterface::kConnecting,
+            webrtc_data_channel_->state());
+  provider_.set_send_blocked(false);
+  EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen,
+                 webrtc_data_channel_->state(), 1000);
+  EXPECT_EQ(cricket::DMT_CONTROL, provider_.last_send_data_params().type);
+}
+
+// Tests that close first makes sure all queued data gets sent.
+TEST_F(SctpDataChannelTest, QueuedCloseFlushes) {
+  webrtc::DataBuffer buffer("foo");
+
+  provider_.set_send_blocked(true);
+  SetChannelReady();
+  EXPECT_EQ(webrtc::DataChannelInterface::kConnecting,
+            webrtc_data_channel_->state());
+  provider_.set_send_blocked(false);
+  EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kOpen,
+                 webrtc_data_channel_->state(), 1000);
+  provider_.set_send_blocked(true);
+  webrtc_data_channel_->Send(buffer);
+  webrtc_data_channel_->Close();
+  provider_.set_send_blocked(false);
+  EXPECT_EQ_WAIT(webrtc::DataChannelInterface::kClosed,
+                 webrtc_data_channel_->state(), 1000);
+  EXPECT_EQ(cricket::DMT_TEXT, provider_.last_send_data_params().type);
+}
+
 // Tests that messages are sent with the right ssrc.
 TEST_F(SctpDataChannelTest, SendDataSsrc) {
   webrtc_data_channel_->SetSctpSid(1);
@@ -369,8 +404,9 @@
     EXPECT_TRUE(webrtc_data_channel_->Send(packet));
   }
 
-  EXPECT_EQ(webrtc::DataChannelInterface::kClosed,
-            webrtc_data_channel_->state());
+  EXPECT_TRUE(
+      webrtc::DataChannelInterface::kClosed == webrtc_data_channel_->state() ||
+      webrtc::DataChannelInterface::kClosing == webrtc_data_channel_->state());
 }
 
 // Tests that the DataChannel is closed on transport errors.
diff --git a/talk/app/webrtc/test/fakedatachannelprovider.h b/talk/app/webrtc/test/fakedatachannelprovider.h
index bf64a94..eb86873 100644
--- a/talk/app/webrtc/test/fakedatachannelprovider.h
+++ b/talk/app/webrtc/test/fakedatachannelprovider.h
@@ -91,11 +91,15 @@
   void set_send_blocked(bool blocked) {
     send_blocked_ = blocked;
     if (!blocked) {
-      std::set<webrtc::DataChannel*>::iterator it;
-      for (it = connected_channels_.begin();
-           it != connected_channels_.end();
-           ++it) {
-        (*it)->OnChannelReady(true);
+      // Take a snapshot of the connected channels and check to see whether
+      // each value is still in connected_channels_ before calling
+      // OnChannelReady().  This avoids problems where the set gets modified
+      // in response to OnChannelReady().
+      for (webrtc::DataChannel *ch : std::set<webrtc::DataChannel*>(
+               connected_channels_.begin(), connected_channels_.end())) {
+        if (connected_channels_.count(ch)) {
+          ch->OnChannelReady(true);
+        }
       }
     }
   }