Update libjingle to 50191337.
R=mallinath@webrtc.org
Review URL: https://webrtc-codereview.appspot.com/1885005
git-svn-id: http://webrtc.googlecode.com/svn/trunk@4461 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc
index 345cd5f..4972424 100644
--- a/talk/app/webrtc/datachannel.cc
+++ b/talk/app/webrtc/datachannel.cc
@@ -34,7 +34,8 @@
namespace webrtc {
-static size_t kMaxQueuedDataPackets = 100;
+static size_t kMaxQueuedReceivedDataPackets = 100;
+static size_t kMaxQueuedSendDataPackets = 100;
talk_base::scoped_refptr<DataChannel> DataChannel::Create(
WebRtcSession* session,
@@ -95,12 +96,13 @@
}
DataChannel::~DataChannel() {
- ClearQueuedData();
+ ClearQueuedReceivedData();
+ ClearQueuedSendData();
}
void DataChannel::RegisterObserver(DataChannelObserver* observer) {
observer_ = observer;
- DeliverQueuedData();
+ DeliverQueuedReceivedData();
}
void DataChannel::UnregisterObserver() {
@@ -117,7 +119,13 @@
}
uint64 DataChannel::buffered_amount() const {
- return 0;
+ uint64 buffered_amount = 0;
+ for (std::deque<DataBuffer*>::const_iterator it = queued_send_data_.begin();
+ it != queued_send_data_.end();
+ ++it) {
+ buffered_amount += (*it)->size();
+ }
+ return buffered_amount;
}
void DataChannel::Close() {
@@ -133,20 +141,22 @@
if (state_ != kOpen) {
return false;
}
- cricket::SendDataParams send_params;
-
- send_params.ssrc = send_ssrc_;
- if (session_->data_channel_type() == cricket::DCT_SCTP) {
- send_params.ordered = config_.ordered;
- send_params.max_rtx_count = config_.maxRetransmits;
- send_params.max_rtx_ms = config_.maxRetransmitTime;
+ // If the queue is non-empty, we're waiting for SignalReadyToSend,
+ // so just add to the end of the queue and keep waiting.
+ if (!queued_send_data_.empty()) {
+ return QueueSendData(buffer);
}
- send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
cricket::SendDataResult send_result;
- // TODO(pthatcher): Use send_result.would_block for buffering.
- return session_->data_channel()->SendData(
- send_params, buffer.data, &send_result);
+ if (!InternalSendWithoutQueueing(buffer, &send_result)) {
+ if (send_result == cricket::SDR_BLOCK) {
+ return QueueSendData(buffer);
+ }
+ // Fail for other results.
+ // TODO(jiayl): We should close the data channel in this case.
+ return false;
+ }
+ return true;
}
void DataChannel::SetReceiveSsrc(uint32 receive_ssrc) {
@@ -183,6 +193,43 @@
DoClose();
}
+void DataChannel::OnDataReceived(cricket::DataChannel* channel,
+ const cricket::ReceiveDataParams& params,
+ const talk_base::Buffer& payload) {
+ if (params.ssrc != receive_ssrc_) {
+ return;
+ }
+
+ bool binary = (params.type == cricket::DMT_BINARY);
+ talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
+ if (was_ever_writable_ && observer_) {
+ observer_->OnMessage(*buffer.get());
+ } else {
+ if (queued_received_data_.size() > kMaxQueuedReceivedDataPackets) {
+ // TODO(jiayl): We should close the data channel in this case.
+ LOG(LS_ERROR)
+ << "Queued received data exceeds the max number of packes.";
+ ClearQueuedReceivedData();
+ }
+ queued_received_data_.push(buffer.release());
+ }
+}
+
+void DataChannel::OnChannelReady(bool writable) {
+ if (!writable) {
+ return;
+ }
+ // Update the readyState if the channel is writable for the first time;
+ // otherwise it means the channel was blocked for sending and now unblocked,
+ // so send the queued data now.
+ if (!was_ever_writable_) {
+ was_ever_writable_ = true;
+ UpdateState();
+ } else if (state_ == kOpen) {
+ SendQueuedSendData();
+ }
+}
+
void DataChannel::DoClose() {
receive_ssrc_set_ = false;
send_ssrc_set_ = false;
@@ -201,7 +248,7 @@
SetState(kOpen);
// If we have received buffers before the channel got writable.
// Deliver them now.
- DeliverQueuedData();
+ DeliverQueuedReceivedData();
}
}
break;
@@ -249,47 +296,76 @@
data_session_ = NULL;
}
-void DataChannel::DeliverQueuedData() {
- if (was_ever_writable_ && observer_) {
- while (!queued_data_.empty()) {
- DataBuffer* buffer = queued_data_.front();
- observer_->OnMessage(*buffer);
- queued_data_.pop();
- delete buffer;
- }
+void DataChannel::DeliverQueuedReceivedData() {
+ if (!was_ever_writable_ || !observer_) {
+ return;
}
-}
-void DataChannel::ClearQueuedData() {
- while (!queued_data_.empty()) {
- DataBuffer* buffer = queued_data_.front();
- queued_data_.pop();
+ while (!queued_received_data_.empty()) {
+ DataBuffer* buffer = queued_received_data_.front();
+ observer_->OnMessage(*buffer);
+ queued_received_data_.pop();
delete buffer;
}
}
-void DataChannel::OnDataReceived(cricket::DataChannel* channel,
- const cricket::ReceiveDataParams& params,
- const talk_base::Buffer& payload) {
- if (params.ssrc == receive_ssrc_) {
- bool binary = false;
- talk_base::scoped_ptr<DataBuffer> buffer(new DataBuffer(payload, binary));
- if (was_ever_writable_ && observer_) {
- observer_->OnMessage(*buffer.get());
- } else {
- if (queued_data_.size() > kMaxQueuedDataPackets) {
- ClearQueuedData();
- }
- queued_data_.push(buffer.release());
- }
+void DataChannel::ClearQueuedReceivedData() {
+ while (!queued_received_data_.empty()) {
+ DataBuffer* buffer = queued_received_data_.front();
+ queued_received_data_.pop();
+ delete buffer;
}
}
-void DataChannel::OnChannelReady(bool writable) {
- if (!was_ever_writable_ && writable) {
- was_ever_writable_ = true;
- UpdateState();
+void DataChannel::SendQueuedSendData() {
+ if (!was_ever_writable_) {
+ return;
}
+
+ while (!queued_send_data_.empty()) {
+ DataBuffer* buffer = queued_send_data_.front();
+ cricket::SendDataResult send_result;
+ if (!InternalSendWithoutQueueing(*buffer, &send_result)) {
+ LOG(LS_WARNING) << "SendQueuedSendData aborted due to send_result "
+ << send_result;
+ break;
+ }
+ queued_send_data_.pop_front();
+ delete buffer;
+ }
+}
+
+void DataChannel::ClearQueuedSendData() {
+ while (!queued_received_data_.empty()) {
+ DataBuffer* buffer = queued_received_data_.front();
+ queued_received_data_.pop();
+ delete buffer;
+ }
+}
+
+bool DataChannel::InternalSendWithoutQueueing(
+ const DataBuffer& buffer, cricket::SendDataResult* send_result) {
+ cricket::SendDataParams send_params;
+
+ send_params.ssrc = send_ssrc_;
+ if (session_->data_channel_type() == cricket::DCT_SCTP) {
+ send_params.ordered = config_.ordered;
+ send_params.max_rtx_count = config_.maxRetransmits;
+ send_params.max_rtx_ms = config_.maxRetransmitTime;
+ }
+ send_params.type = buffer.binary ? cricket::DMT_BINARY : cricket::DMT_TEXT;
+
+ return session_->data_channel()->SendData(send_params, buffer.data,
+ send_result);
+}
+
+bool DataChannel::QueueSendData(const DataBuffer& buffer) {
+ if (queued_send_data_.size() > kMaxQueuedSendDataPackets) {
+ LOG(LS_ERROR) << "Can't buffer any more data in the data channel.";
+ return false;
+ }
+ queued_send_data_.push_back(new DataBuffer(buffer));
+ return true;
}
} // namespace webrtc
diff --git a/talk/app/webrtc/datachannel.h b/talk/app/webrtc/datachannel.h
index c79c491..440ee15 100644
--- a/talk/app/webrtc/datachannel.h
+++ b/talk/app/webrtc/datachannel.h
@@ -109,8 +109,13 @@
void ConnectToDataSession();
void DisconnectFromDataSession();
bool IsConnectedToDataSession() { return data_session_ != NULL; }
- void DeliverQueuedData();
- void ClearQueuedData();
+ void DeliverQueuedReceivedData();
+ void ClearQueuedReceivedData();
+ void SendQueuedSendData();
+ void ClearQueuedSendData();
+ bool InternalSendWithoutQueueing(const DataBuffer& buffer,
+ cricket::SendDataResult* send_result);
+ bool QueueSendData(const DataBuffer& buffer);
std::string label_;
DataChannelInit config_;
@@ -123,7 +128,8 @@
uint32 send_ssrc_;
bool receive_ssrc_set_;
uint32 receive_ssrc_;
- std::queue<DataBuffer*> queued_data_;
+ std::queue<DataBuffer*> queued_received_data_;
+ std::deque<DataBuffer*> queued_send_data_;
};
class DataChannelFactory {
diff --git a/talk/app/webrtc/datachannel_unittest.cc b/talk/app/webrtc/datachannel_unittest.cc
new file mode 100644
index 0000000..d3faf17
--- /dev/null
+++ b/talk/app/webrtc/datachannel_unittest.cc
@@ -0,0 +1,129 @@
+/*
+ * libjingle
+ * Copyright 2013, Google Inc.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * 3. The name of the author may not be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
+ * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
+ * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+ * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
+ * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "talk/app/webrtc/datachannel.h"
+#include "talk/app/webrtc/mediastreamsignaling.h"
+#include "talk/app/webrtc/test/fakeconstraints.h"
+#include "talk/app/webrtc/webrtcsession.h"
+#include "talk/base/gunit.h"
+#include "talk/media/base/fakemediaengine.h"
+#include "talk/media/devices/fakedevicemanager.h"
+#include "talk/session/media/channelmanager.h"
+
+using webrtc::MediaConstraintsInterface;
+
+const uint32 kFakeSsrc = 1;
+
+class SctpDataChannelTest : public testing::Test {
+ protected:
+ SctpDataChannelTest()
+ : media_engine_(new cricket::FakeMediaEngine),
+ data_engine_(new cricket::FakeDataEngine),
+ channel_manager_(
+ new cricket::ChannelManager(media_engine_,
+ data_engine_,
+ new cricket::FakeDeviceManager(),
+ new cricket::CaptureManager(),
+ talk_base::Thread::Current())),
+ session_(channel_manager_.get(),
+ talk_base::Thread::Current(),
+ talk_base::Thread::Current(),
+ NULL,
+ new webrtc::MediaStreamSignaling(talk_base::Thread::Current(),
+ NULL)),
+ webrtc_data_channel_(NULL) {}
+
+ virtual void SetUp() {
+ if (!talk_base::SSLStreamAdapter::HaveDtlsSrtp()) {
+ return;
+ }
+ channel_manager_->Init();
+ webrtc::FakeConstraints constraints;
+ constraints.AddMandatory(MediaConstraintsInterface::kEnableDtlsSrtp, true);
+ constraints.AddMandatory(MediaConstraintsInterface::kEnableSctpDataChannels,
+ true);
+ ASSERT_TRUE(session_.Initialize(&constraints));
+ webrtc::SessionDescriptionInterface* offer = session_.CreateOffer(NULL);
+ ASSERT_TRUE(offer != NULL);
+ ASSERT_TRUE(session_.SetLocalDescription(offer, NULL));
+
+ webrtc_data_channel_ = webrtc::DataChannel::Create(&session_, "test", NULL);
+ // Connect to the media channel.
+ webrtc_data_channel_->SetSendSsrc(kFakeSsrc);
+ webrtc_data_channel_->SetReceiveSsrc(kFakeSsrc);
+
+ session_.data_channel()->SignalReadyToSendData(true);
+ }
+
+ void SetSendBlocked(bool blocked) {
+ bool was_blocked = data_engine_->GetChannel(0)->is_send_blocked();
+ data_engine_->GetChannel(0)->set_send_blocked(blocked);
+ if (!blocked && was_blocked) {
+ session_.data_channel()->SignalReadyToSendData(true);
+ }
+ }
+
+ cricket::FakeMediaEngine* media_engine_;
+ cricket::FakeDataEngine* data_engine_;
+ talk_base::scoped_ptr<cricket::ChannelManager> channel_manager_;
+ webrtc::WebRtcSession session_;
+ talk_base::scoped_refptr<webrtc::DataChannel> webrtc_data_channel_;
+};
+
+// Tests that DataChannel::buffered_amount() is correct after the channel is
+// blocked.
+TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
+ if (!talk_base::SSLStreamAdapter::HaveDtlsSrtp()) {
+ return;
+ }
+ webrtc::DataBuffer buffer("abcd");
+ EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
+
+ EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
+
+ SetSendBlocked(true);
+ const int number_of_packets = 3;
+ for (int i = 0; i < number_of_packets; ++i) {
+ EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
+ }
+ EXPECT_EQ(buffer.data.length() * number_of_packets,
+ webrtc_data_channel_->buffered_amount());
+}
+
+// Tests that the queued data are sent when the channel transitions from blocked
+// to unblocked.
+TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) {
+ if (!talk_base::SSLStreamAdapter::HaveDtlsSrtp()) {
+ return;
+ }
+ webrtc::DataBuffer buffer("abcd");
+ SetSendBlocked(true);
+ EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
+
+ SetSendBlocked(false);
+ EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
+}
diff --git a/talk/app/webrtc/datachannelinterface.h b/talk/app/webrtc/datachannelinterface.h
index 6054e1b..82d375c 100644
--- a/talk/app/webrtc/datachannelinterface.h
+++ b/talk/app/webrtc/datachannelinterface.h
@@ -75,6 +75,8 @@
: data(text.data(), text.length()),
binary(false) {
}
+ size_t size() const { return data.length(); }
+
talk_base::Buffer data;
// Indicates if the received data contains UTF-8 or binary data.
// Note that the upper layers are left to verify the UTF-8 encoding.