Update libjingle to 50191337.

R=mallinath@webrtc.org

Review URL: https://webrtc-codereview.appspot.com/1885005

git-svn-id: http://webrtc.googlecode.com/svn/trunk@4461 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc
index 345cd5f..4972424 100644
--- a/talk/app/webrtc/datachannel.cc
+++ b/talk/app/webrtc/datachannel.cc
@@ -34,7 +34,8 @@
 
 namespace webrtc {
 
-static size_t kMaxQueuedDataPackets = 100;
+static size_t kMaxQueuedReceivedDataPackets = 100;
+static size_t kMaxQueuedSendDataPackets = 100;
 
 talk_base::scoped_refptr<DataChannel> DataChannel::Create(
     WebRtcSession* session,
@@ -95,12 +96,13 @@
 }
 
 DataChannel::~DataChannel() {
-  ClearQueuedData();
+  ClearQueuedReceivedData();
+  ClearQueuedSendData();
 }
 
 void DataChannel::RegisterObserver(DataChannelObserver* observer) {
   observer_ = observer;
-  DeliverQueuedData();
+  DeliverQueuedReceivedData();
 }
 
 void DataChannel::UnregisterObserver() {
@@ -117,7 +119,13 @@
 }
 
 uint64 DataChannel::buffered_amount() const {
-  return 0;
+  uint64 buffered_amount = 0;
+  for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
+      it != queued_send_data_.end();
+      ++it) {
+    buffered_amount += (*it)->size();
+  }
+  return buffered_amount;
 }
 
 void DataChannel::Close() {
@@ -133,20 +141,22 @@
   if (state_ != kOpen) {
     return false;
   }
-  cricket::SendDataParams send_params;
-
-  send_params.ssrc = send_ssrc_;
-  if (session_->data_channel_type() == cricket::DCT_SCTP) {
-    send_params.ordered = config_.ordered;
-    send_params.max_rtx_count = config_.maxRetransmits;
-    send_params.max_rtx_ms = config_.maxRetransmitTime;
+  // If the queue is non-empty, we're waiting for SignalReadyToSend,
+  // so just add to the end of the queue and keep waiting.
+  if (!queued_send_data_.empty()) {
+    return QueueSendData(buffer);
   }
-  send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
 
   cricket::SendDataResult send_result;
-  // TODO(pthatcher): Use send_result.would_block for buffering.
-  return session_->data_channel()->SendData(
-      send_params, buffer.data, &send_result);
+  if (!InternalSendWithoutQueueing(buffer, &send_result)) {
+    if (send_result == cricket::SDR_BLOCK) {
+      return QueueSendData(buffer);
+    }
+    // Fail for other results.
+    // TODO(jiayl): We should close the data channel in this case.
+    return false;
+  }
+  return true;
 }
 
 void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
@@ -183,6 +193,43 @@
   DoClose();
 }
 
+void DataChannel::OnDataReceived(cricket::DataChannel* channel,
+                                 const cricket::ReceiveDataParams& params,
+                                 const talk_base::Buffer& payload) {
+  if (params.ssrc != receive_ssrc_) {
+    return;
+  }
+
+  bool binary = (params.type == cricket::DMT_BINARY);
+  talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
+  if (was_ever_writable_ && observer_) {
+    observer_->OnMessage(*buffer.get());
+  } else {
+    if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
+      // TODO(jiayl): We should close the data channel in this case.
+      LOG(LS_ERROR)
+          << "Queued received data exceeds the max number of packes.";
+      ClearQueuedReceivedData();
+    }
+    queued_received_data_.push(buffer.release());
+  }
+}
+
+void DataChannel::OnChannelReady(bool writable) {
+  if (!writable) {
+    return;
+  }
+  // Update the readyState if the channel is writable for the first time;
+  // otherwise it means the channel was blocked for sending and now unblocked,
+  // so send the queued data now.
+  if (!was_ever_writable_) {
+    was_ever_writable_ = true;
+    UpdateState();
+  } else if (state_ == kOpen) {
+    SendQueuedSendData();
+  }
+}
+
 void DataChannel::DoClose() {
   receive_ssrc_set_ = false;
   send_ssrc_set_ = false;
@@ -201,7 +248,7 @@
           SetState(kOpen);
           // If we have received buffers before the channel got writable.
           // Deliver them now.
-          DeliverQueuedData();
+          DeliverQueuedReceivedData();
         }
       }
       break;
@@ -249,47 +296,76 @@
   data_session_ = NULL;
 }
 
-void DataChannel::DeliverQueuedData() {
-  if (was_ever_writable_ && observer_) {
-    while (!queued_data_.empty()) {
-      DataBuffer* buffer = queued_data_.front();
-      observer_->OnMessage(*buffer);
-      queued_data_.pop();
-      delete buffer;
-    }
+void DataChannel::DeliverQueuedReceivedData() {
+  if (!was_ever_writable_ || !observer_) {
+    return;
   }
-}
 
-void DataChannel::ClearQueuedData() {
-  while (!queued_data_.empty()) {
-    DataBuffer* buffer = queued_data_.front();
-    queued_data_.pop();
+  while (!queued_received_data_.empty()) {
+    DataBuffer* buffer = queued_received_data_.front();
+    observer_->OnMessage(*buffer);
+    queued_received_data_.pop();
     delete buffer;
   }
 }
 
-void DataChannel::OnDataReceived(cricket::DataChannel* channel,
-                                 const cricket::ReceiveDataParams& params,
-                                 const talk_base::Buffer& payload) {
-  if (params.ssrc == receive_ssrc_) {
-    bool binary = false;
-    talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
-    if (was_ever_writable_ && observer_) {
-      observer_->OnMessage(*buffer.get());
-    } else {
-      if (queued_data_.size() > kMaxQueuedDataPackets) {
-        ClearQueuedData();
-      }
-      queued_data_.push(buffer.release());
-    }
+void DataChannel::ClearQueuedReceivedData() {
+  while (!queued_received_data_.empty()) {
+    DataBuffer* buffer = queued_received_data_.front();
+    queued_received_data_.pop();
+    delete buffer;
   }
 }
 
-void DataChannel::OnChannelReady(bool writable) {
-  if (!was_ever_writable_ && writable) {
-    was_ever_writable_ = true;
-    UpdateState();
+void DataChannel::SendQueuedSendData() {
+  if (!was_ever_writable_) {
+    return;
   }
+
+  while (!queued_send_data_.empty()) {
+    DataBuffer* buffer = queued_send_data_.front();
+    cricket::SendDataResult send_result;
+    if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
+      LOG(LS_WARNING) << "SendQueuedSendData aborted due to send_result "
+                      << send_result;
+      break;
+    }
+    queued_send_data_.pop_front();
+    delete buffer;
+  }
+}
+
+void DataChannel::ClearQueuedSendData() {
+  while (!queued_received_data_.empty()) {
+    DataBuffer* buffer = queued_received_data_.front();
+    queued_received_data_.pop();
+    delete buffer;
+  }
+}
+
+bool DataChannel::InternalSendWithoutQueueing(
+    const DataBuffer& buffer, cricket::SendDataResult* send_result) {
+  cricket::SendDataParams send_params;
+
+  send_params.ssrc = send_ssrc_;
+  if (session_->data_channel_type() == cricket::DCT_SCTP) {
+    send_params.ordered = config_.ordered;
+    send_params.max_rtx_count = config_.maxRetransmits;
+    send_params.max_rtx_ms = config_.maxRetransmitTime;
+  }
+  send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
+
+  return session_->data_channel()->SendData(send_params, buffer.data,
+                                            send_result);
+}
+
+bool DataChannel::QueueSendData(const DataBuffer& buffer) {
+  if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
+    LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
+    return false;
+  }
+  queued_send_data_.push_back(new DataBuffer(buffer));
+  return true;
 }
 
 }  // namespace webrtc
diff --git a/talk/app/webrtc/datachannel.h b/talk/app/webrtc/datachannel.h
index c79c491..440ee15 100644
--- a/talk/app/webrtc/datachannel.h
+++ b/talk/app/webrtc/datachannel.h
@@ -109,8 +109,13 @@
   void ConnectToDataSession();
   void DisconnectFromDataSession();
   bool IsConnectedToDataSession() { return data_session_ != NULL; }
-  void DeliverQueuedData();
-  void ClearQueuedData();
+  void DeliverQueuedReceivedData();
+  void ClearQueuedReceivedData();
+  void SendQueuedSendData();
+  void ClearQueuedSendData();
+  bool InternalSendWithoutQueueing(const DataBuffer& buffer,
+                                   cricket::SendDataResult* send_result);
+  bool QueueSendData(const DataBuffer& buffer);
 
   std::string label_;
   DataChannelInit config_;
@@ -123,7 +128,8 @@
   uint32 send_ssrc_;
   bool receive_ssrc_set_;
   uint32 receive_ssrc_;
-  std::queue<DataBuffer*> queued_data_;
+  std::queue<DataBuffer*> queued_received_data_;
+  std::deque<DataBuffer*> queued_send_data_;
 };
 
 class DataChannelFactory {
diff --git a/talk/app/webrtc/datachannel_unittest.cc b/talk/app/webrtc/datachannel_unittest.cc
new file mode 100644
index 0000000..d3faf17
--- /dev/null
+++ b/talk/app/webrtc/datachannel_unittest.cc
@@ -0,0 +1,129 @@
+/*
+ * libjingle
+ * Copyright 2013, Google Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ *  1. Redistributions of source code must retain the above copyright notice,
+ *     this list of conditions and the following disclaimer.
+ *  2. Redistributions in binary form must reproduce the above copyright notice,
+ *     this list of conditions and the following disclaimer in the documentation
+ *     and/or other materials provided with the distribution.
+ *  3. The name of the author may not be used to endorse or promote products
+ *     derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+ * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "talk/app/webrtc/datachannel.h"
+#include "talk/app/webrtc/mediastreamsignaling.h"
+#include "talk/app/webrtc/test/fakeconstraints.h"
+#include "talk/app/webrtc/webrtcsession.h"
+#include "talk/base/gunit.h"
+#include "talk/media/base/fakemediaengine.h"
+#include "talk/media/devices/fakedevicemanager.h"
+#include "talk/session/media/channelmanager.h"
+
+using webrtc::MediaConstraintsInterface;
+
+const uint32 kFakeSsrc = 1;
+
+class SctpDataChannelTest : public testing::Test {
+ protected:
+  SctpDataChannelTest()
+      : media_engine_(new cricket::FakeMediaEngine),
+        data_engine_(new cricket::FakeDataEngine),
+        channel_manager_(
+            new cricket::ChannelManager(media_engine_,
+                                        data_engine_,
+                                        new cricket::FakeDeviceManager(),
+                                        new cricket::CaptureManager(),
+                                        talk_base::Thread::Current())),
+        session_(channel_manager_.get(),
+                 talk_base::Thread::Current(),
+                 talk_base::Thread::Current(),
+                 NULL,
+                 new webrtc::MediaStreamSignaling(talk_base::Thread::Current(),
+                                                  NULL)),
+        webrtc_data_channel_(NULL) {}
+
+  virtual void SetUp() {
+    if (!talk_base::SSLStreamAdapter::HaveDtlsSrtp()) {
+      return;
+    }
+    channel_manager_->Init();
+    webrtc::FakeConstraints constraints;
+    constraints.AddMandatory(MediaConstraintsInterface::kEnableDtlsSrtp, true);
+    constraints.AddMandatory(MediaConstraintsInterface::kEnableSctpDataChannels,
+                             true);
+    ASSERT_TRUE(session_.Initialize(&constraints));
+    webrtc::SessionDescriptionInterface* offer = session_.CreateOffer(NULL);
+    ASSERT_TRUE(offer != NULL);
+    ASSERT_TRUE(session_.SetLocalDescription(offer, NULL));
+
+    webrtc_data_channel_ = webrtc::DataChannel::Create(&session_, "test", NULL);
+    // Connect to the media channel.
+    webrtc_data_channel_->SetSendSsrc(kFakeSsrc);
+    webrtc_data_channel_->SetReceiveSsrc(kFakeSsrc);
+
+    session_.data_channel()->SignalReadyToSendData(true);
+  }
+
+  void SetSendBlocked(bool blocked) {
+    bool was_blocked = data_engine_->GetChannel(0)->is_send_blocked();
+    data_engine_->GetChannel(0)->set_send_blocked(blocked);
+    if (!blocked && was_blocked) {
+      session_.data_channel()->SignalReadyToSendData(true);
+    }
+  }
+
+  cricket::FakeMediaEngine* media_engine_;
+  cricket::FakeDataEngine* data_engine_;
+  talk_base::scoped_ptr<cricket::ChannelManager> channel_manager_;
+  webrtc::WebRtcSession session_;
+  talk_base::scoped_refptr<webrtc::DataChannel> webrtc_data_channel_;
+};
+
+// Tests that DataChannel::buffered_amount() is correct after the channel is
+// blocked.
+TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
+  if (!talk_base::SSLStreamAdapter::HaveDtlsSrtp()) {
+    return;
+  }
+  webrtc::DataBuffer buffer("abcd");
+  EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
+
+  EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
+
+  SetSendBlocked(true);
+  const int number_of_packets = 3;
+  for (int i = 0; i < number_of_packets; ++i) {
+    EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
+  }
+  EXPECT_EQ(buffer.data.length() * number_of_packets,
+            webrtc_data_channel_->buffered_amount());
+}
+
+// Tests that the queued data are sent when the channel transitions from blocked
+// to unblocked.
+TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) {
+  if (!talk_base::SSLStreamAdapter::HaveDtlsSrtp()) {
+    return;
+  }
+  webrtc::DataBuffer buffer("abcd");
+  SetSendBlocked(true);
+  EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
+
+  SetSendBlocked(false);
+  EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
+}
diff --git a/talk/app/webrtc/datachannelinterface.h b/talk/app/webrtc/datachannelinterface.h
index 6054e1b..82d375c 100644
--- a/talk/app/webrtc/datachannelinterface.h
+++ b/talk/app/webrtc/datachannelinterface.h
@@ -75,6 +75,8 @@
       : data(text.data(), text.length()),
         binary(false) {
   }
+  size_t size() const { return data.length(); }
+
   talk_base::Buffer data;
   // Indicates if the received data contains UTF-8 or binary data.
   // Note that the upper layers are left to verify the UTF-8 encoding.