Implement data channel methods in LoopbackMediaTransport.

This enables PeerConnection tests to use LoopbackMediaTransport to test
data-channel-over-media-transport code.

Also changes LoopbackMediaTransport to invoke callbacks asynchronously.
This is more accurate, as these callbacks are triggered by network
events.  The caller should not block while the callback executes.

Since LoopbackMediaTransport is used for testing, it provides a
FlushAsyncInvokes() method which may be used to ensure that callbacks
occur deterministically (eg. before checking that data has been
received).

Bug: webrtc:9719
Change-Id: Ib8ea9aebf4a0ad3d5934a6fe4ab33432c68523fd
Tbr: stefan@webrtc.org
Reviewed-on: https://webrtc-review.googlesource.com/c/109060
Commit-Queue: Bjorn Mellem <mellem@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#25489}
diff --git a/api/BUILD.gn b/api/BUILD.gn
index 7e74c1a..4aa42d4 100644
--- a/api/BUILD.gn
+++ b/api/BUILD.gn
@@ -611,6 +611,7 @@
     deps = [
       ":libjingle_peerconnection_api",
       "../rtc_base:checks",
+      "../rtc_base:rtc_base",
     ]
   }
 
diff --git a/api/test/DEPS b/api/test/DEPS
index 98b1ad3..1fc1f7c 100644
--- a/api/test/DEPS
+++ b/api/test/DEPS
@@ -5,4 +5,9 @@
   ".*": [
     "+video"
   ],
+  "loopback_media_transport\.h": [
+    "+rtc_base/asyncinvoker.h",
+    "+rtc_base/criticalsection.h",
+    "+rtc_base/thread.h",
+  ],
 }
diff --git a/api/test/loopback_media_transport.h b/api/test/loopback_media_transport.h
index 9ce50a7..f3f24d4 100644
--- a/api/test/loopback_media_transport.h
+++ b/api/test/loopback_media_transport.h
@@ -14,6 +14,9 @@
 #include <utility>
 
 #include "api/media_transport_interface.h"
+#include "rtc_base/asyncinvoker.h"
+#include "rtc_base/criticalsection.h"
+#include "rtc_base/thread.h"
 
 namespace webrtc {
 
@@ -21,24 +24,36 @@
 // Currently supports audio only.
 class MediaTransportPair {
  public:
-  MediaTransportPair()
-      : pipe_{LoopbackMediaTransport(&pipe_[1]),
-              LoopbackMediaTransport(&pipe_[0])} {}
+  explicit MediaTransportPair(rtc::Thread* thread)
+      : first_(thread, &second_), second_(thread, &first_) {}
 
   // Ownership stays with MediaTransportPair
-  MediaTransportInterface* first() { return &pipe_[0]; }
-  MediaTransportInterface* second() { return &pipe_[1]; }
+  MediaTransportInterface* first() { return &first_; }
+  MediaTransportInterface* second() { return &second_; }
+
+  void FlushAsyncInvokes() {
+    first_.FlushAsyncInvokes();
+    second_.FlushAsyncInvokes();
+  }
 
  private:
   class LoopbackMediaTransport : public MediaTransportInterface {
    public:
-    explicit LoopbackMediaTransport(LoopbackMediaTransport* other)
-        : other_(other) {}
-    ~LoopbackMediaTransport() { RTC_CHECK(sink_ == nullptr); }
+    LoopbackMediaTransport(rtc::Thread* thread, LoopbackMediaTransport* other)
+        : thread_(thread), other_(other) {}
+
+    ~LoopbackMediaTransport() {
+      rtc::CritScope lock(&sink_lock_);
+      RTC_CHECK(sink_ == nullptr);
+      RTC_CHECK(data_sink_ == nullptr);
+    }
 
     RTCError SendAudioFrame(uint64_t channel_id,
                             MediaTransportEncodedAudioFrame frame) override {
-      other_->OnData(channel_id, std::move(frame));
+      invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_,
+                                 [this, channel_id, frame] {
+                                   other_->OnData(channel_id, std::move(frame));
+                                 });
       return RTCError::OK();
     };
 
@@ -53,6 +68,7 @@
     }
 
     void SetReceiveAudioSink(MediaTransportAudioSinkInterface* sink) override {
+      rtc::CritScope lock(&sink_lock_);
       if (sink) {
         RTC_CHECK(sink_ == nullptr);
       }
@@ -70,27 +86,69 @@
     RTCError SendData(int channel_id,
                       const SendDataParams& params,
                       const rtc::CopyOnWriteBuffer& buffer) override {
-      return RTCError(RTCErrorType::UNSUPPORTED_OPERATION, "Not implemented");
+      invoker_.AsyncInvoke<void>(
+          RTC_FROM_HERE, thread_, [this, channel_id, params, buffer] {
+            other_->OnData(channel_id, params.type, buffer);
+          });
+      return RTCError::OK();
     }
 
     RTCError CloseChannel(int channel_id) override {
-      return RTCError(RTCErrorType::UNSUPPORTED_OPERATION, "Not implemented");
+      invoker_.AsyncInvoke<void>(RTC_FROM_HERE, thread_, [this, channel_id] {
+        other_->OnRemoteCloseChannel(channel_id);
+        rtc::CritScope lock(&sink_lock_);
+        if (data_sink_) {
+          data_sink_->OnChannelClosed(channel_id);
+        }
+      });
+      return RTCError::OK();
     }
 
-    void SetDataSink(DataChannelSink* sink) override {}
+    void SetDataSink(DataChannelSink* sink) override {
+      rtc::CritScope lock(&sink_lock_);
+      data_sink_ = sink;
+    }
+
+    void FlushAsyncInvokes() { invoker_.Flush(thread_); }
 
    private:
     void OnData(uint64_t channel_id, MediaTransportEncodedAudioFrame frame) {
+      rtc::CritScope lock(&sink_lock_);
       if (sink_) {
         sink_->OnData(channel_id, frame);
       }
     }
 
-    MediaTransportAudioSinkInterface* sink_ = nullptr;
-    LoopbackMediaTransport* other_;
+    void OnData(int channel_id,
+                DataMessageType type,
+                const rtc::CopyOnWriteBuffer& buffer) {
+      rtc::CritScope lock(&sink_lock_);
+      if (data_sink_) {
+        data_sink_->OnDataReceived(channel_id, type, buffer);
+      }
+    }
+
+    void OnRemoteCloseChannel(int channel_id) {
+      rtc::CritScope lock(&sink_lock_);
+      if (data_sink_) {
+        data_sink_->OnChannelClosing(channel_id);
+        data_sink_->OnChannelClosed(channel_id);
+      }
+    }
+
+    rtc::Thread* const thread_;
+    rtc::CriticalSection sink_lock_;
+
+    MediaTransportAudioSinkInterface* sink_ RTC_GUARDED_BY(sink_lock_) =
+        nullptr;
+    DataChannelSink* data_sink_ RTC_GUARDED_BY(sink_lock_) = nullptr;
+    LoopbackMediaTransport* const other_;
+
+    rtc::AsyncInvoker invoker_;
   };
 
-  LoopbackMediaTransport pipe_[2];
+  LoopbackMediaTransport first_;
+  LoopbackMediaTransport second_;
 };
 
 }  // namespace webrtc
diff --git a/api/test/loopback_media_transport_unittest.cc b/api/test/loopback_media_transport_unittest.cc
index bff74b8..f85413c 100644
--- a/api/test/loopback_media_transport_unittest.cc
+++ b/api/test/loopback_media_transport_unittest.cc
@@ -8,6 +8,7 @@
  *  be found in the AUTHORS file in the root of the source tree.
  */
 
+#include <memory>
 #include <vector>
 
 #include "api/test/loopback_media_transport.h"
@@ -23,6 +24,14 @@
   MOCK_METHOD2(OnData, void(uint64_t, MediaTransportEncodedAudioFrame));
 };
 
+class MockDataChannelSink : public DataChannelSink {
+ public:
+  MOCK_METHOD3(OnDataReceived,
+               void(int, DataMessageType, const rtc::CopyOnWriteBuffer&));
+  MOCK_METHOD1(OnChannelClosing, void(int));
+  MOCK_METHOD1(OnChannelClosed, void(int));
+};
+
 // Test only uses the sequence number.
 MediaTransportEncodedAudioFrame CreateAudioFrame(int sequence_number) {
   static constexpr int kSamplingRateHz = 48000;
@@ -39,13 +48,18 @@
 }  // namespace
 
 TEST(LoopbackMediaTransport, AudioWithNoSinkSilentlyIgnored) {
-  MediaTransportPair transport_pair;
+  std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
+  thread->Start();
+  MediaTransportPair transport_pair(thread.get());
   transport_pair.first()->SendAudioFrame(1, CreateAudioFrame(0));
   transport_pair.second()->SendAudioFrame(2, CreateAudioFrame(0));
+  transport_pair.FlushAsyncInvokes();
 }
 
 TEST(LoopbackMediaTransport, AudioDeliveredToSink) {
-  MediaTransportPair transport_pair;
+  std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
+  thread->Start();
+  MediaTransportPair transport_pair(thread.get());
   testing::StrictMock<MockMediaTransportAudioSinkInterface> sink;
   EXPECT_CALL(sink,
               OnData(1, testing::Property(
@@ -54,7 +68,58 @@
   transport_pair.second()->SetReceiveAudioSink(&sink);
   transport_pair.first()->SendAudioFrame(1, CreateAudioFrame(10));
 
+  transport_pair.FlushAsyncInvokes();
   transport_pair.second()->SetReceiveAudioSink(nullptr);
 }
 
+TEST(LoopbackMediaTransport, DataDeliveredToSink) {
+  std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
+  thread->Start();
+  MediaTransportPair transport_pair(thread.get());
+
+  MockDataChannelSink sink;
+  transport_pair.first()->SetDataSink(&sink);
+
+  const int channel_id = 1;
+  EXPECT_CALL(sink,
+              OnDataReceived(
+                  channel_id, DataMessageType::kText,
+                  testing::Property<rtc::CopyOnWriteBuffer, const char*>(
+                      &rtc::CopyOnWriteBuffer::cdata, testing::StrEq("foo"))));
+
+  SendDataParams params;
+  params.type = DataMessageType::kText;
+  rtc::CopyOnWriteBuffer buffer("foo");
+  transport_pair.second()->SendData(channel_id, params, buffer);
+
+  transport_pair.FlushAsyncInvokes();
+  transport_pair.first()->SetDataSink(nullptr);
+}
+
+TEST(LoopbackMediaTransport, CloseDeliveredToSink) {
+  std::unique_ptr<rtc::Thread> thread = rtc::Thread::Create();
+  thread->Start();
+  MediaTransportPair transport_pair(thread.get());
+
+  MockDataChannelSink first_sink;
+  transport_pair.first()->SetDataSink(&first_sink);
+
+  MockDataChannelSink second_sink;
+  transport_pair.second()->SetDataSink(&second_sink);
+
+  const int channel_id = 1;
+  {
+    testing::InSequence s;
+    EXPECT_CALL(second_sink, OnChannelClosing(channel_id));
+    EXPECT_CALL(second_sink, OnChannelClosed(channel_id));
+    EXPECT_CALL(first_sink, OnChannelClosed(channel_id));
+  }
+
+  transport_pair.first()->CloseChannel(channel_id);
+
+  transport_pair.FlushAsyncInvokes();
+  transport_pair.first()->SetDataSink(nullptr);
+  transport_pair.second()->SetDataSink(nullptr);
+}
+
 }  // namespace webrtc
diff --git a/audio/test/media_transport_test.cc b/audio/test/media_transport_test.cc
index 7c25a72..91677db 100644
--- a/audio/test/media_transport_test.cc
+++ b/audio/test/media_transport_test.cc
@@ -65,7 +65,9 @@
 }  // namespace
 
 TEST(AudioWithMediaTransport, DeliversAudio) {
-  MediaTransportPair transport_pair;
+  std::unique_ptr<rtc::Thread> transport_thread = rtc::Thread::Create();
+  transport_thread->Start();
+  MediaTransportPair transport_pair(transport_thread.get());
   MockTransport rtcp_send_transport;
   MockTransport send_transport;
   std::unique_ptr<RtcEventLog> null_event_log = RtcEventLog::CreateNull();