Revert r6420 'Revert r6390 "Adds end to end DataChannel tests." Flaky on linux_memcheck'
Failing tests are disabled for memcheck.

TBR=wu@webrtc.org
BUG=2626

Review URL: https://webrtc-codereview.appspot.com/13699004

Review URL: https://webrtc-codereview.appspot.com/13699004

git-svn-id: http://webrtc.googlecode.com/svn/trunk@6422 4adac7df-926f-26a2-2b94-8c16560cd09d
diff --git a/talk/app/webrtc/peerconnectionendtoend_unittest.cc b/talk/app/webrtc/peerconnectionendtoend_unittest.cc
index 9959720..f701e06 100644
--- a/talk/app/webrtc/peerconnectionendtoend_unittest.cc
+++ b/talk/app/webrtc/peerconnectionendtoend_unittest.cc
@@ -26,6 +26,7 @@
  */
 
 #include "talk/app/webrtc/test/peerconnectiontestwrapper.h"
+#include "talk/app/webrtc/test/mockpeerconnectionobservers.h"
 #include "talk/base/gunit.h"
 #include "talk/base/logging.h"
 #include "talk/base/ssladapter.h"
@@ -33,6 +34,13 @@
 #include "talk/base/stringencode.h"
 #include "talk/base/stringutils.h"
 
+#define MAYBE_SKIP_TEST(feature)                    \
+  if (!(feature())) {                               \
+    LOG(LS_INFO) << "Feature disabled... skipping"; \
+    return;                                         \
+  }
+
+using webrtc::DataChannelInterface;
 using webrtc::FakeConstraints;
 using webrtc::MediaConstraintsInterface;
 using webrtc::MediaStreamInterface;
@@ -42,6 +50,7 @@
 
 const char kExternalGiceUfrag[] = "1234567890123456";
 const char kExternalGicePwd[] = "123456789012345678901234";
+const size_t kMaxWait = 10000;
 
 void RemoveLinesFromSdp(const std::string& line_start,
                                std::string* sdp) {
@@ -117,6 +126,9 @@
     : public sigslot::has_slots<>,
       public testing::Test {
  public:
+  typedef std::vector<talk_base::scoped_refptr<DataChannelInterface> >
+      DataChannelList;
+
   PeerConnectionEndToEndTest()
       : caller_(new talk_base::RefCountedObject<PeerConnectionTestWrapper>(
                     "caller")),
@@ -133,6 +145,11 @@
     EXPECT_TRUE(caller_->CreatePc(pc_constraints));
     EXPECT_TRUE(callee_->CreatePc(pc_constraints));
     PeerConnectionTestWrapper::Connect(caller_.get(), callee_.get());
+
+    caller_->SignalOnDataChannel.connect(
+        this, &PeerConnectionEndToEndTest::OnCallerAddedDataChanel);
+    callee_->SignalOnDataChannel.connect(
+        this, &PeerConnectionEndToEndTest::OnCalleeAddedDataChannel);
   }
 
   void GetAndAddUserMedia() {
@@ -158,6 +175,11 @@
     callee_->WaitForCallEstablished();
   }
 
+  void WaitForConnection() {
+    caller_->WaitForConnection();
+    callee_->WaitForConnection();
+  }
+
   void SetupLegacySdpConverter() {
     caller_->SignalOnSdpCreated.connect(
       this, &PeerConnectionEndToEndTest::ConvertToLegacySdp);
@@ -189,6 +211,57 @@
     LOG(LS_INFO) << "AddGiceCredsToCandidate: " << *sdp;
   }
 
+  void OnCallerAddedDataChanel(DataChannelInterface* dc) {
+    caller_signaled_data_channels_.push_back(dc);
+  }
+
+  void OnCalleeAddedDataChannel(DataChannelInterface* dc) {
+    callee_signaled_data_channels_.push_back(dc);
+  }
+
+  // Tests that |dc1| and |dc2| can send to and receive from each other.
+  void TestDataChannelSendAndReceive(
+      DataChannelInterface* dc1, DataChannelInterface* dc2) {
+    talk_base::scoped_ptr<webrtc::MockDataChannelObserver> dc1_observer(
+        new webrtc::MockDataChannelObserver(dc1));
+
+    talk_base::scoped_ptr<webrtc::MockDataChannelObserver> dc2_observer(
+        new webrtc::MockDataChannelObserver(dc2));
+
+    static const std::string kDummyData = "abcdefg";
+    webrtc::DataBuffer buffer(kDummyData);
+    EXPECT_TRUE(dc1->Send(buffer));
+    EXPECT_EQ_WAIT(kDummyData, dc2_observer->last_message(), kMaxWait);
+
+    EXPECT_TRUE(dc2->Send(buffer));
+    EXPECT_EQ_WAIT(kDummyData, dc1_observer->last_message(), kMaxWait);
+
+    EXPECT_EQ(1U, dc1_observer->received_message_count());
+    EXPECT_EQ(1U, dc2_observer->received_message_count());
+  }
+
+  void WaitForDataChannelsToOpen(DataChannelInterface* local_dc,
+                                 const DataChannelList& remote_dc_list,
+                                 size_t remote_dc_index) {
+    EXPECT_EQ_WAIT(DataChannelInterface::kOpen, local_dc->state(), kMaxWait);
+
+    EXPECT_TRUE_WAIT(remote_dc_list.size() > remote_dc_index, kMaxWait);
+    EXPECT_EQ_WAIT(DataChannelInterface::kOpen,
+                   remote_dc_list[remote_dc_index]->state(),
+                   kMaxWait);
+    EXPECT_EQ(local_dc->id(), remote_dc_list[remote_dc_index]->id());
+  }
+
+  void CloseDataChannels(DataChannelInterface* local_dc,
+                         const DataChannelList& remote_dc_list,
+                         size_t remote_dc_index) {
+    local_dc->Close();
+    EXPECT_EQ_WAIT(DataChannelInterface::kClosed, local_dc->state(), kMaxWait);
+    EXPECT_EQ_WAIT(DataChannelInterface::kClosed,
+                   remote_dc_list[remote_dc_index]->state(),
+                   kMaxWait);
+  }
+
   ~PeerConnectionEndToEndTest() {
     talk_base::CleanupSSL();
   }
@@ -196,6 +269,8 @@
  protected:
   talk_base::scoped_refptr<PeerConnectionTestWrapper> caller_;
   talk_base::scoped_refptr<PeerConnectionTestWrapper> callee_;
+  DataChannelList caller_signaled_data_channels_;
+  DataChannelList callee_signaled_data_channels_;
 };
 
 // Disable for TSan v2, see
@@ -222,4 +297,126 @@
   WaitForCallEstablished();
 }
 
+// Verifies that a DataChannel created before the negotiation can transition to
+// "OPEN" and transfer data.
+TEST_F(PeerConnectionEndToEndTest, CreateDataChannelBeforeNegotiate) {
+  MAYBE_SKIP_TEST(talk_base::SSLStreamAdapter::HaveDtlsSrtp);
+
+  CreatePcs();
+
+  webrtc::DataChannelInit init;
+  talk_base::scoped_refptr<DataChannelInterface> caller_dc(
+      caller_->CreateDataChannel("data", init));
+  talk_base::scoped_refptr<DataChannelInterface> callee_dc(
+      callee_->CreateDataChannel("data", init));
+
+  Negotiate();
+  WaitForConnection();
+
+  WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 0);
+  WaitForDataChannelsToOpen(callee_dc, caller_signaled_data_channels_, 0);
+
+  TestDataChannelSendAndReceive(caller_dc, callee_signaled_data_channels_[0]);
+  TestDataChannelSendAndReceive(callee_dc, caller_signaled_data_channels_[0]);
+
+  CloseDataChannels(caller_dc, callee_signaled_data_channels_, 0);
+  CloseDataChannels(callee_dc, caller_signaled_data_channels_, 0);
+}
+
+// Verifies that a DataChannel created after the negotiation can transition to
+// "OPEN" and transfer data.
+TEST_F(PeerConnectionEndToEndTest, CreateDataChannelAfterNegotiate) {
+  MAYBE_SKIP_TEST(talk_base::SSLStreamAdapter::HaveDtlsSrtp);
+
+  CreatePcs();
+
+  webrtc::DataChannelInit init;
+
+  // This DataChannel is for creating the data content in the negotiation.
+  talk_base::scoped_refptr<DataChannelInterface> dummy(
+      caller_->CreateDataChannel("data", init));
+  Negotiate();
+  WaitForConnection();
+
+  // Creates new DataChannels after the negotiation and verifies their states.
+  talk_base::scoped_refptr<DataChannelInterface> caller_dc(
+      caller_->CreateDataChannel("hello", init));
+  talk_base::scoped_refptr<DataChannelInterface> callee_dc(
+      callee_->CreateDataChannel("hello", init));
+
+  WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 1);
+  WaitForDataChannelsToOpen(callee_dc, caller_signaled_data_channels_, 0);
+
+  TestDataChannelSendAndReceive(caller_dc, callee_signaled_data_channels_[1]);
+  TestDataChannelSendAndReceive(callee_dc, caller_signaled_data_channels_[0]);
+
+  CloseDataChannels(caller_dc, callee_signaled_data_channels_, 1);
+  CloseDataChannels(callee_dc, caller_signaled_data_channels_, 0);
+}
+
+// Verifies that DataChannel IDs are even/odd based on the DTLS roles.
+TEST_F(PeerConnectionEndToEndTest, DataChannelIdAssignment) {
+  MAYBE_SKIP_TEST(talk_base::SSLStreamAdapter::HaveDtlsSrtp);
+
+  CreatePcs();
+
+  webrtc::DataChannelInit init;
+  talk_base::scoped_refptr<DataChannelInterface> caller_dc_1(
+      caller_->CreateDataChannel("data", init));
+  talk_base::scoped_refptr<DataChannelInterface> callee_dc_1(
+      callee_->CreateDataChannel("data", init));
+
+  Negotiate();
+  WaitForConnection();
+
+  EXPECT_EQ(1U, caller_dc_1->id() % 2);
+  EXPECT_EQ(0U, callee_dc_1->id() % 2);
+
+  talk_base::scoped_refptr<DataChannelInterface> caller_dc_2(
+      caller_->CreateDataChannel("data", init));
+  talk_base::scoped_refptr<DataChannelInterface> callee_dc_2(
+      callee_->CreateDataChannel("data", init));
+
+  EXPECT_EQ(1U, caller_dc_2->id() % 2);
+  EXPECT_EQ(0U, callee_dc_2->id() % 2);
+}
+
+// Verifies that the message is received by the right remote DataChannel when
+// there are multiple DataChannels.
+TEST_F(PeerConnectionEndToEndTest,
+       MessageTransferBetweenTwoPairsOfDataChannels) {
+  MAYBE_SKIP_TEST(talk_base::SSLStreamAdapter::HaveDtlsSrtp);
+
+  CreatePcs();
+
+  webrtc::DataChannelInit init;
+
+  talk_base::scoped_refptr<DataChannelInterface> caller_dc_1(
+      caller_->CreateDataChannel("data", init));
+  talk_base::scoped_refptr<DataChannelInterface> caller_dc_2(
+      caller_->CreateDataChannel("data", init));
+
+  Negotiate();
+  WaitForConnection();
+  WaitForDataChannelsToOpen(caller_dc_1, callee_signaled_data_channels_, 0);
+  WaitForDataChannelsToOpen(caller_dc_2, callee_signaled_data_channels_, 1);
+
+  talk_base::scoped_ptr<webrtc::MockDataChannelObserver> dc_1_observer(
+      new webrtc::MockDataChannelObserver(callee_signaled_data_channels_[0]));
+
+  talk_base::scoped_ptr<webrtc::MockDataChannelObserver> dc_2_observer(
+      new webrtc::MockDataChannelObserver(callee_signaled_data_channels_[1]));
+
+  const std::string message_1 = "hello 1";
+  const std::string message_2 = "hello 2";
+
+  caller_dc_1->Send(webrtc::DataBuffer(message_1));
+  EXPECT_EQ_WAIT(message_1, dc_1_observer->last_message(), kMaxWait);
+
+  caller_dc_2->Send(webrtc::DataBuffer(message_2));
+  EXPECT_EQ_WAIT(message_2, dc_2_observer->last_message(), kMaxWait);
+
+  EXPECT_EQ(1U, dc_1_observer->received_message_count());
+  EXPECT_EQ(1U, dc_2_observer->received_message_count());
+}
 #endif // if !defined(THREAD_SANITIZER)
diff --git a/talk/app/webrtc/test/mockpeerconnectionobservers.h b/talk/app/webrtc/test/mockpeerconnectionobservers.h
index e2de379..3ae2162 100644
--- a/talk/app/webrtc/test/mockpeerconnectionobservers.h
+++ b/talk/app/webrtc/test/mockpeerconnectionobservers.h
@@ -90,7 +90,7 @@
 class MockDataChannelObserver : public webrtc::DataChannelObserver {
  public:
   explicit MockDataChannelObserver(webrtc::DataChannelInterface* channel)
-     : channel_(channel) {
+     : channel_(channel), received_message_count_(0) {
     channel_->RegisterObserver(this);
     state_ = channel_->state();
   }
@@ -101,15 +101,18 @@
   virtual void OnStateChange() { state_ = channel_->state(); }
   virtual void OnMessage(const DataBuffer& buffer) {
     last_message_.assign(buffer.data.data(), buffer.data.length());
+    ++received_message_count_;
   }
 
   bool IsOpen() const { return state_ == DataChannelInterface::kOpen; }
   const std::string& last_message() const { return last_message_; }
+  size_t received_message_count() const { return received_message_count_; }
 
  private:
   talk_base::scoped_refptr<webrtc::DataChannelInterface> channel_;
   DataChannelInterface::DataState state_;
   std::string last_message_;
+  size_t received_message_count_;
 };
 
 class MockStatsObserver : public webrtc::StatsObserver {
diff --git a/talk/app/webrtc/test/peerconnectiontestwrapper.cc b/talk/app/webrtc/test/peerconnectiontestwrapper.cc
index 7d3664f..be70969 100644
--- a/talk/app/webrtc/test/peerconnectiontestwrapper.cc
+++ b/talk/app/webrtc/test/peerconnectiontestwrapper.cc
@@ -103,6 +103,13 @@
   return peer_connection_.get() != NULL;
 }
 
+talk_base::scoped_refptr<webrtc::DataChannelInterface>
+PeerConnectionTestWrapper::CreateDataChannel(
+    const std::string& label,
+    const webrtc::DataChannelInit& init) {
+  return peer_connection_->CreateDataChannel(label, &init);
+}
+
 void PeerConnectionTestWrapper::OnAddStream(MediaStreamInterface* stream) {
   LOG(LS_INFO) << "PeerConnectionTestWrapper " << name_
                << ": OnAddStream";
@@ -122,6 +129,11 @@
                             sdp);
 }
 
+void PeerConnectionTestWrapper::OnDataChannel(
+    webrtc::DataChannelInterface* data_channel) {
+  SignalOnDataChannel(data_channel);
+}
+
 void PeerConnectionTestWrapper::OnSuccess(SessionDescriptionInterface* desc) {
   // This callback should take the ownership of |desc|.
   talk_base::scoped_ptr<SessionDescriptionInterface> owned_desc(desc);
diff --git a/talk/app/webrtc/test/peerconnectiontestwrapper.h b/talk/app/webrtc/test/peerconnectiontestwrapper.h
index 46fefaf..05e9b62 100644
--- a/talk/app/webrtc/test/peerconnectiontestwrapper.h
+++ b/talk/app/webrtc/test/peerconnectiontestwrapper.h
@@ -52,6 +52,10 @@
 
   bool CreatePc(const webrtc::MediaConstraintsInterface* constraints);
 
+  talk_base::scoped_refptr<webrtc::DataChannelInterface> CreateDataChannel(
+      const std::string& label,
+      const webrtc::DataChannelInit& init);
+
   // Implements PeerConnectionObserver.
   virtual void OnError() {}
   virtual void OnSignalingChange(
@@ -60,7 +64,7 @@
       webrtc::PeerConnectionObserver::StateType state_changed) {}
   virtual void OnAddStream(webrtc::MediaStreamInterface* stream);
   virtual void OnRemoveStream(webrtc::MediaStreamInterface* stream) {}
-  virtual void OnDataChannel(webrtc::DataChannelInterface* data_channel) {}
+  virtual void OnDataChannel(webrtc::DataChannelInterface* data_channel);
   virtual void OnRenegotiationNeeded() {}
   virtual void OnIceConnectionChange(
       webrtc::PeerConnectionInterface::IceConnectionState new_state) {}
@@ -94,6 +98,7 @@
                    const std::string&> SignalOnIceCandidateReady;
   sigslot::signal1<std::string*> SignalOnSdpCreated;
   sigslot::signal1<const std::string&> SignalOnSdpReady;
+  sigslot::signal1<webrtc::DataChannelInterface*> SignalOnDataChannel;
 
  private:
   void SetLocalDescription(const std::string& type, const std::string& sdp);