Support for onbufferedamountlow

Original review at https://webrtc-codereview.appspot.com/54679004/

BUG=https://code.google.com/p/chromium/issues/detail?id=496700

Review URL: https://codereview.webrtc.org/1207613006

Cr-Commit-Position: refs/heads/master@{#9527}
diff --git a/talk/app/webrtc/datachannel.cc b/talk/app/webrtc/datachannel.cc
index 559eec5..690ee65 100644
--- a/talk/app/webrtc/datachannel.cc
+++ b/talk/app/webrtc/datachannel.cc
@@ -476,6 +476,7 @@
 
   ASSERT(state_ == kOpen || state_ == kClosing);
 
+  uint64 start_buffered_amount = buffered_amount();
   while (!queued_send_data_.Empty()) {
     DataBuffer* buffer = queued_send_data_.Front();
     if (!SendDataMessage(*buffer, false)) {
@@ -485,6 +486,10 @@
     queued_send_data_.Pop();
     delete buffer;
   }
+
+  if (observer_ && buffered_amount() < start_buffered_amount) {
+    observer_->OnBufferedAmountChange(start_buffered_amount);
+  }
 }
 
 bool DataChannel::SendDataMessage(const DataBuffer& buffer,
@@ -534,11 +539,17 @@
 }
 
 bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
-  if (queued_send_data_.byte_count() >= kMaxQueuedSendDataBytes) {
+  size_t start_buffered_amount = buffered_amount();
+  if (start_buffered_amount >= kMaxQueuedSendDataBytes) {
     LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
     return false;
   }
   queued_send_data_.Push(new DataBuffer(buffer));
+
+  // The buffer can have length zero, in which case there is no change.
+  if (observer_ && buffered_amount() > start_buffered_amount) {
+    observer_->OnBufferedAmountChange(start_buffered_amount);
+  }
   return true;
 }
 
diff --git a/talk/app/webrtc/datachannel_unittest.cc b/talk/app/webrtc/datachannel_unittest.cc
index bc4f81c..e3c290b 100644
--- a/talk/app/webrtc/datachannel_unittest.cc
+++ b/talk/app/webrtc/datachannel_unittest.cc
@@ -35,12 +35,18 @@
 class FakeDataChannelObserver : public webrtc::DataChannelObserver {
  public:
   FakeDataChannelObserver()
-      : messages_received_(0), on_state_change_count_(0) {}
+      : messages_received_(0),
+        on_state_change_count_(0),
+        on_buffered_amount_change_count_(0) {}
 
   void OnStateChange() {
     ++on_state_change_count_;
   }
 
+  void OnBufferedAmountChange(uint64 previous_amount) {
+    ++on_buffered_amount_change_count_;
+  }
+
   void OnMessage(const webrtc::DataBuffer& buffer) {
     ++messages_received_;
   }
@@ -53,13 +59,22 @@
     on_state_change_count_ = 0;
   }
 
+  void ResetOnBufferedAmountChangeCount() {
+    on_buffered_amount_change_count_ = 0;
+  }
+
   size_t on_state_change_count() const {
     return on_state_change_count_;
   }
 
+  size_t on_buffered_amount_change_count() const {
+    return on_buffered_amount_change_count_;
+  }
+
  private:
   size_t messages_received_;
   size_t on_state_change_count_;
+  size_t on_buffered_amount_change_count_;
 };
 
 class SctpDataChannelTest : public testing::Test {
@@ -133,11 +148,13 @@
 // Tests that DataChannel::buffered_amount() is correct after the channel is
 // blocked.
 TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
+  AddObserver();
   SetChannelReady();
   webrtc::DataBuffer buffer("abcd");
   EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
 
   EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
+  EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
 
   provider_.set_send_blocked(true);
 
@@ -147,37 +164,46 @@
   }
   EXPECT_EQ(buffer.data.size() * number_of_packets,
             webrtc_data_channel_->buffered_amount());
+  EXPECT_EQ(number_of_packets, observer_->on_buffered_amount_change_count());
 }
 
 // Tests that the queued data are sent when the channel transitions from blocked
 // to unblocked.
 TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) {
+  AddObserver();
   SetChannelReady();
   webrtc::DataBuffer buffer("abcd");
   provider_.set_send_blocked(true);
   EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
 
+  EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
+
   provider_.set_send_blocked(false);
   SetChannelReady();
   EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
+  EXPECT_EQ(2U, observer_->on_buffered_amount_change_count());
 }
 
 // Tests that no crash when the channel is blocked right away while trying to
 // send queued data.
 TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) {
+  AddObserver();
   SetChannelReady();
   webrtc::DataBuffer buffer("abcd");
   provider_.set_send_blocked(true);
   EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
+  EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
 
   // Set channel ready while it is still blocked.
   SetChannelReady();
   EXPECT_EQ(buffer.size(), webrtc_data_channel_->buffered_amount());
+  EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
 
   // Unblock the channel to send queued data again, there should be no crash.
   provider_.set_send_blocked(false);
   SetChannelReady();
   EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
+  EXPECT_EQ(2U, observer_->on_buffered_amount_change_count());
 }
 
 // Tests that the queued control message is sent when channel is ready.
diff --git a/talk/app/webrtc/datachannelinterface.h b/talk/app/webrtc/datachannelinterface.h
index 6312262..90573eb 100644
--- a/talk/app/webrtc/datachannelinterface.h
+++ b/talk/app/webrtc/datachannelinterface.h
@@ -91,6 +91,8 @@
   virtual void OnStateChange() = 0;
   //  A data buffer was successfully received.
   virtual void OnMessage(const DataBuffer& buffer) = 0;
+  // The data channel's buffered_amount has changed.
+  virtual void OnBufferedAmountChange(uint64 previous_amount){};
 
  protected:
   virtual ~DataChannelObserver() {}
diff --git a/talk/app/webrtc/java/jni/peerconnection_jni.cc b/talk/app/webrtc/java/jni/peerconnection_jni.cc
index 69078e6..2b6cbc7 100644
--- a/talk/app/webrtc/java/jni/peerconnection_jni.cc
+++ b/talk/app/webrtc/java/jni/peerconnection_jni.cc
@@ -559,16 +559,24 @@
       : j_observer_global_(jni, j_observer),
         j_observer_class_(jni, GetObjectClass(jni, j_observer)),
         j_buffer_class_(jni, FindClass(jni, "org/webrtc/DataChannel$Buffer")),
-        j_on_state_change_mid_(GetMethodID(jni, *j_observer_class_,
-                                           "onStateChange", "()V")),
+        j_on_buffered_amount_change_mid_(GetMethodID(
+            jni, *j_observer_class_, "onBufferedAmountChange", "(J)V")),
+        j_on_state_change_mid_(
+            GetMethodID(jni, *j_observer_class_, "onStateChange", "()V")),
         j_on_message_mid_(GetMethodID(jni, *j_observer_class_, "onMessage",
                                       "(Lorg/webrtc/DataChannel$Buffer;)V")),
-        j_buffer_ctor_(GetMethodID(jni, *j_buffer_class_,
-                                   "<init>", "(Ljava/nio/ByteBuffer;Z)V")) {
-  }
+        j_buffer_ctor_(GetMethodID(jni, *j_buffer_class_, "<init>",
+                                   "(Ljava/nio/ByteBuffer;Z)V")) {}
 
   virtual ~DataChannelObserverWrapper() {}
 
+  void OnBufferedAmountChange(uint64 previous_amount) override {
+    ScopedLocalRefFrame local_ref_frame(jni());
+    jni()->CallVoidMethod(*j_observer_global_, j_on_buffered_amount_change_mid_,
+                          previous_amount);
+    CHECK_EXCEPTION(jni()) << "error during CallVoidMethod";
+  }
+
   void OnStateChange() override {
     ScopedLocalRefFrame local_ref_frame(jni());
     jni()->CallVoidMethod(*j_observer_global_, j_on_state_change_mid_);
@@ -593,6 +601,7 @@
   const ScopedGlobalRef<jobject> j_observer_global_;
   const ScopedGlobalRef<jclass> j_observer_class_;
   const ScopedGlobalRef<jclass> j_buffer_class_;
+  const jmethodID j_on_buffered_amount_change_mid_;
   const jmethodID j_on_state_change_mid_;
   const jmethodID j_on_message_mid_;
   const jmethodID j_buffer_ctor_;
diff --git a/talk/app/webrtc/java/src/org/webrtc/DataChannel.java b/talk/app/webrtc/java/src/org/webrtc/DataChannel.java
index deee84b..1866098 100644
--- a/talk/app/webrtc/java/src/org/webrtc/DataChannel.java
+++ b/talk/app/webrtc/java/src/org/webrtc/DataChannel.java
@@ -77,6 +77,8 @@
 
   /** Java version of C++ DataChannelObserver. */
   public interface Observer {
+    /** The data channel's bufferedAmount has changed. */
+    public void onBufferedAmountChange(long previousAmount);
     /** The data channel state has changed. */
     public void onStateChange();
     /**
diff --git a/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java b/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java
index 224225c..b87f484 100644
--- a/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java
+++ b/talk/app/webrtc/java/testcommon/src/org/webrtc/PeerConnectionTest.java
@@ -259,6 +259,11 @@
     }
 
     @Override
+    public synchronized void onBufferedAmountChange(long previousAmount) {
+      assertFalse(previousAmount == dataChannel.bufferedAmount());
+    }
+
+    @Override
     public synchronized void onStateChange() {
       assertEquals(expectedStateChanges.removeFirst(), dataChannel.state());
     }
diff --git a/talk/app/webrtc/objc/RTCDataChannel.mm b/talk/app/webrtc/objc/RTCDataChannel.mm
index 4fb03c2..8a9b6b6 100644
--- a/talk/app/webrtc/objc/RTCDataChannel.mm
+++ b/talk/app/webrtc/objc/RTCDataChannel.mm
@@ -43,6 +43,15 @@
     [_channel.delegate channelDidChangeState:_channel];
   }
 
+  void OnBufferedAmountChange(uint64 previousAmount) override {
+    RTCDataChannel* channel = _channel;
+    id<RTCDataChannelDelegate> delegate = channel.delegate;
+    if ([delegate
+            respondsToSelector:@selector(channel:didChangeBufferedAmount:)]) {
+      [delegate channel:channel didChangeBufferedAmount:previousAmount];
+    }
+  }
+
   void OnMessage(const DataBuffer& buffer) override {
     if (!_channel.delegate) {
       return;
diff --git a/talk/app/webrtc/objc/public/RTCDataChannel.h b/talk/app/webrtc/objc/public/RTCDataChannel.h
index 7c22580..24a46f6 100644
--- a/talk/app/webrtc/objc/public/RTCDataChannel.h
+++ b/talk/app/webrtc/objc/public/RTCDataChannel.h
@@ -82,6 +82,12 @@
 - (void)channel:(RTCDataChannel*)channel
     didReceiveMessageWithBuffer:(RTCDataBuffer*)buffer;
 
+@optional
+
+// Called when the buffered amount has changed.
+- (void)channel:(RTCDataChannel*)channel
+    didChangeBufferedAmount:(NSUInteger)amount;
+
 @end
 
 // ObjectiveC wrapper for a DataChannel object.
diff --git a/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m b/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m
index 5c76672..5070b78 100644
--- a/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m
+++ b/talk/app/webrtc/objctests/RTCPeerConnectionSyncObserver.m
@@ -231,6 +231,12 @@
 }
 
 - (void)channel:(RTCDataChannel*)channel
+    didChangeBufferedAmount:(NSUInteger)previousAmount {
+  NSAssert(channel.bufferedAmount != previousAmount,
+           @"Invalid bufferedAmount change");
+}
+
+- (void)channel:(RTCDataChannel*)channel
     didReceiveMessageWithBuffer:(RTCDataBuffer*)buffer {
   NSAssert([_expectedMessages count] > 0,
            @"Unexpected message received");
diff --git a/talk/app/webrtc/test/mockpeerconnectionobservers.h b/talk/app/webrtc/test/mockpeerconnectionobservers.h
index 580a0fb..d2697b4 100644
--- a/talk/app/webrtc/test/mockpeerconnectionobservers.h
+++ b/talk/app/webrtc/test/mockpeerconnectionobservers.h
@@ -98,8 +98,10 @@
     channel_->UnregisterObserver();
   }
 
-  virtual void OnStateChange() { state_ = channel_->state(); }
-  virtual void OnMessage(const DataBuffer& buffer) {
+  void OnBufferedAmountChange(uint64 previous_amount) override {}
+
+  void OnStateChange() override { state_ = channel_->state(); }
+  void OnMessage(const DataBuffer& buffer) override {
     last_message_.assign(buffer.data.data<char>(), buffer.data.size());
     ++received_message_count_;
   }