Support for onbufferedamountlow Original review at https://webrtc-codereview.appspot.com/54679004/ BUG=https://code.google.com/p/chromium/issues/detail?id=496700 Review URL: https://codereview.webrtc.org/1207613006 Cr-Commit-Position: refs/heads/master@{#9527}
diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc index 559eec5..690ee65 100644 --- a/talk/app/webrtc/datachannel.cc +++ b/talk/app/webrtc/datachannel.cc
@@ -476,6 +476,7 @@ ASSERT(state_ == kOpen || state_ == kClosing); + uint64 start_buffered_amount = buffered_amount(); while (!queued_send_data_.Empty()) { DataBuffer* buffer = queued_send_data_.Front(); if (!SendDataMessage(*buffer, false)) { @@ -485,6 +486,10 @@ queued_send_data_.Pop(); delete buffer; } + + if (observer_ && buffered_amount() < start_buffered_amount) { + observer_->OnBufferedAmountChange(start_buffered_amount); + } } bool DataChannel::SendDataMessage(const DataBuffer& buffer, @@ -534,11 +539,17 @@ } bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { - if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) { + size_t start_buffered_amount = buffered_amount(); + if (start_buffered_amount >= kMaxQueuedSendDataBytes) { LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; return false; } queued_send_data_.Push(new DataBuffer(buffer)); + + // The buffer can have length zero, in which case there is no change. + if (observer_ && buffered_amount() > start_buffered_amount) { + observer_->OnBufferedAmountChange(start_buffered_amount); + } return true; }
diff --git a/talk/app/webrtc/datachannel_unittest.cc b/talk/app/webrtc/datachannel_unittest.cc index bc4f81c..e3c290b 100644 --- a/talk/app/webrtc/datachannel_unittest.cc +++ b/talk/app/webrtc/datachannel_unittest.cc
@@ -35,12 +35,18 @@ class FakeDataChannelObserver : public webrtc::DataChannelObserver { public: FakeDataChannelObserver() - : messages_received_(0), on_state_change_count_(0) {} + : messages_received_(0), + on_state_change_count_(0), + on_buffered_amount_change_count_(0) {} void OnStateChange() { ++on_state_change_count_; } + void OnBufferedAmountChange(uint64 previous_amount) { + ++on_buffered_amount_change_count_; + } + void OnMessage(const webrtc::DataBuffer& buffer) { ++messages_received_; } @@ -53,13 +59,22 @@ on_state_change_count_ = 0; } + void ResetOnBufferedAmountChangeCount() { + on_buffered_amount_change_count_ = 0; + } + size_t on_state_change_count() const { return on_state_change_count_; } + size_t on_buffered_amount_change_count() const { + return on_buffered_amount_change_count_; + } + private: size_t messages_received_; size_t on_state_change_count_; + size_t on_buffered_amount_change_count_; }; class SctpDataChannelTest : public testing::Test { @@ -133,11 +148,13 @@ // Tests that DataChannel::buffered_amount() is correct after the channel is // blocked. TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { + AddObserver(); SetChannelReady(); webrtc::DataBuffer buffer("abcd"); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); provider_.set_send_blocked(true); @@ -147,37 +164,46 @@ } EXPECT_EQ(buffer.data.size() * number_of_packets, webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(number_of_packets, observer_->on_buffered_amount_change_count()); } // Tests that the queued data are sent when the channel transitions from blocked // to unblocked. TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) { + AddObserver(); SetChannelReady(); webrtc::DataBuffer buffer("abcd"); provider_.set_send_blocked(true); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); + provider_.set_send_blocked(false); SetChannelReady(); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(2U, observer_->on_buffered_amount_change_count()); } // Tests that no crash when the channel is blocked right away while trying to // send queued data. TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) { + AddObserver(); SetChannelReady(); webrtc::DataBuffer buffer("abcd"); provider_.set_send_blocked(true); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); // Set channel ready while it is still blocked. SetChannelReady(); EXPECT_EQ(buffer.size(), webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); // Unblock the channel to send queued data again, there should be no crash. provider_.set_send_blocked(false); SetChannelReady(); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(2U, observer_->on_buffered_amount_change_count()); } // Tests that the queued control message is sent when channel is ready.
diff --git a/talk/app/webrtc/datachannelinterface.h b/talk/app/webrtc/datachannelinterface.h index 6312262..90573eb 100644 --- a/talk/app/webrtc/datachannelinterface.h +++ b/talk/app/webrtc/datachannelinterface.h
@@ -91,6 +91,8 @@ virtual void OnStateChange() = 0; // A data buffer was successfully received. virtual void OnMessage(const DataBuffer& buffer) = 0; + // The data channel's buffered_amount has changed. + virtual void OnBufferedAmountChange(uint64 previous_amount){}; protected: virtual ~DataChannelObserver() {}
diff --git a/talk/app/webrtc/java/jni/peerconnection_jni.cc b/talk/app/webrtc/java/jni/peerconnection_jni.cc index 69078e6..2b6cbc7 100644 --- a/talk/app/webrtc/java/jni/peerconnection_jni.cc +++ b/talk/app/webrtc/java/jni/peerconnection_jni.cc
@@ -559,16 +559,24 @@ : j_observer_global_(jni, j_observer), j_observer_class_(jni, GetObjectClass(jni, j_observer)), j_buffer_class_(jni, FindClass(jni, "org/webrtc/DataChannel$Buffer")), - j_on_state_change_mid_(GetMethodID(jni, *j_observer_class_, - "onStateChange", "()V")), + j_on_buffered_amount_change_mid_(GetMethodID( + jni, *j_observer_class_, "onBufferedAmountChange", "(J)V")), + j_on_state_change_mid_( + GetMethodID(jni, *j_observer_class_, "onStateChange", "()V")), j_on_message_mid_(GetMethodID(jni, *j_observer_class_, "onMessage", "(Lorg/webrtc/DataChannel$Buffer;)V")), - j_buffer_ctor_(GetMethodID(jni, *j_buffer_class_, - "<init>", "(Ljava/nio/ByteBuffer;Z)V")) { - } + j_buffer_ctor_(GetMethodID(jni, *j_buffer_class_, "<init>", + "(Ljava/nio/ByteBuffer;Z)V")) {} virtual ~DataChannelObserverWrapper() {} + void OnBufferedAmountChange(uint64 previous_amount) override { + ScopedLocalRefFrame local_ref_frame(jni()); + jni()->CallVoidMethod(*j_observer_global_, j_on_buffered_amount_change_mid_, + previous_amount); + CHECK_EXCEPTION(jni()) << "error during CallVoidMethod"; + } + void OnStateChange() override { ScopedLocalRefFrame local_ref_frame(jni()); jni()->CallVoidMethod(*j_observer_global_, j_on_state_change_mid_); @@ -593,6 +601,7 @@ const ScopedGlobalRef<jobject> j_observer_global_; const ScopedGlobalRef<jclass> j_observer_class_; const ScopedGlobalRef<jclass> j_buffer_class_; + const jmethodID j_on_buffered_amount_change_mid_; const jmethodID j_on_state_change_mid_; const jmethodID j_on_message_mid_; const jmethodID j_buffer_ctor_;
diff --git a/talk/app/webrtc/java/src/org/webrtc/DataChannel.java b/talk/app/webrtc/java/src/org/webrtc/DataChannel.java index deee84b..1866098 100644 --- a/talk/app/webrtc/java/src/org/webrtc/DataChannel.java +++ b/talk/app/webrtc/java/src/org/webrtc/DataChannel.java
@@ -77,6 +77,8 @@ /** Java version of C++ DataChannelObserver. */ public interface Observer { + /** The data channel's bufferedAmount has changed. */ + public void onBufferedAmountChange(long previousAmount); /** The data channel state has changed. */ public void onStateChange(); /**
diff --git a/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java b/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java index 224225c..b87f484 100644 --- a/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java +++ b/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java
@@ -259,6 +259,11 @@ } @Override + public synchronized void onBufferedAmountChange(long previousAmount) { + assertFalse(previousAmount == dataChannel.bufferedAmount()); + } + + @Override public synchronized void onStateChange() { assertEquals(expectedStateChanges.removeFirst(), dataChannel.state()); }
diff --git a/talk/app/webrtc/objc/RTCDataChannel.mm b/talk/app/webrtc/objc/RTCDataChannel.mm index 4fb03c2..8a9b6b6 100644 --- a/talk/app/webrtc/objc/RTCDataChannel.mm +++ b/talk/app/webrtc/objc/RTCDataChannel.mm
@@ -43,6 +43,15 @@ [_channel.delegate channelDidChangeState:_channel]; } + void OnBufferedAmountChange(uint64 previousAmount) override { + RTCDataChannel* channel = _channel; + id<RTCDataChannelDelegate> delegate = channel.delegate; + if ([delegate + respondsToSelector:@selector(channel:didChangeBufferedAmount:)]) { + [delegate channel:channel didChangeBufferedAmount:previousAmount]; + } + } + void OnMessage(const DataBuffer& buffer) override { if (!_channel.delegate) { return;
diff --git a/talk/app/webrtc/objc/public/RTCDataChannel.h b/talk/app/webrtc/objc/public/RTCDataChannel.h index 7c22580..24a46f6 100644 --- a/talk/app/webrtc/objc/public/RTCDataChannel.h +++ b/talk/app/webrtc/objc/public/RTCDataChannel.h
@@ -82,6 +82,12 @@ - (void)channel:(RTCDataChannel*)channel didReceiveMessageWithBuffer:(RTCDataBuffer*)buffer; +@optional + +// Called when the buffered amount has changed. +- (void)channel:(RTCDataChannel*)channel + didChangeBufferedAmount:(NSUInteger)amount; + @end // ObjectiveC wrapper for a DataChannel object.
diff --git a/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m b/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m index 5c76672..5070b78 100644 --- a/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m +++ b/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m
@@ -231,6 +231,12 @@ } - (void)channel:(RTCDataChannel*)channel + didChangeBufferedAmount:(NSUInteger)previousAmount { + NSAssert(channel.bufferedAmount != previousAmount, + @"Invalid bufferedAmount change"); +} + +- (void)channel:(RTCDataChannel*)channel didReceiveMessageWithBuffer:(RTCDataBuffer*)buffer { NSAssert([_expectedMessages count] > 0, @"Unexpected message received");
diff --git a/talk/app/webrtc/test/mockpeerconnectionobservers.h b/talk/app/webrtc/test/mockpeerconnectionobservers.h index 580a0fb..d2697b4 100644 --- a/talk/app/webrtc/test/mockpeerconnectionobservers.h +++ b/talk/app/webrtc/test/mockpeerconnectionobservers.h
@@ -98,8 +98,10 @@ channel_->UnregisterObserver(); } - virtual void OnStateChange() { state_ = channel_->state(); } - virtual void OnMessage(const DataBuffer& buffer) { + void OnBufferedAmountChange(uint64 previous_amount) override {} + + void OnStateChange() override { state_ = channel_->state(); } + void OnMessage(const DataBuffer& buffer) override { last_message_.assign(buffer.data.data<char>(), buffer.data.size()); ++received_message_count_; }