Data Channel Benchmarking tool

Create a server using:
./data_channel_benchmark --server --port 12345
Start the flow of data from the server to a client using:
./data_channel_benchmark --port 12345 --transfer_size 100
The throughput is reported on the server console.

The negotiation does not require a 3rd party server and is done over a
gRPC transport. No TURN server is configured, so both peers need to be
reachable using STUN only.

Bug: webrtc:13288
Change-Id: Iac9a96cf390ab465ea45a46bf0b40950c56dfceb
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/235661
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36206}
diff --git a/DEPS b/DEPS
index dc31e4d..99fb48d 100644
--- a/DEPS
+++ b/DEPS
@@ -156,6 +156,9 @@
     'url': 'https://chromium.googlesource.com/chromium/deps/findbugs.git@4275d9ac8610db6b1bc9a5e887f97e41b33fac67',
     'condition': 'checkout_android',
   },
+  'src/third_party/grpc/src': {
+    'url': 'https://chromium.googlesource.com/external/github.com/grpc/grpc.git@f8a909e76fcd947949502832a7ab8e2cba2b8e27',
+  },
   # Used for embedded builds. CrOS & Linux use the system version.
   'src/third_party/fontconfig/src': {
       'url': 'https://chromium.googlesource.com/external/fontconfig.git@452be8125f0e2a18a7dfef469e05d19374d36307',
diff --git a/rtc_tools/BUILD.gn b/rtc_tools/BUILD.gn
index b841228..fa17013 100644
--- a/rtc_tools/BUILD.gn
+++ b/rtc_tools/BUILD.gn
@@ -47,6 +47,9 @@
       ":unpack_aecdump",
     ]
   }
+  if (!build_with_chromium && rtc_enable_grpc) {
+    deps += [ "data_channel_benchmark" ]
+  }
 }
 
 rtc_library("video_file_reader") {
diff --git a/rtc_tools/data_channel_benchmark/BUILD.gn b/rtc_tools/data_channel_benchmark/BUILD.gn
new file mode 100644
index 0000000..c7fbb85
--- /dev/null
+++ b/rtc_tools/data_channel_benchmark/BUILD.gn
@@ -0,0 +1,62 @@
+# Copyright 2021 The Chromium Authors. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+import("//third_party/grpc/grpc_library.gni")
+import("../../webrtc.gni")
+
+grpc_library("signaling_grpc_proto") {
+  testonly = true
+  sources = [ "peer_connection_signaling.proto" ]
+}
+
+rtc_library("signaling_interface") {
+  sources = [ "signaling_interface.h" ]
+  deps = [ "../../api:libjingle_peerconnection_api" ]
+}
+
+rtc_library("grpc_signaling") {
+  testonly = true
+  sources = [
+    "grpc_signaling.cc",
+    "grpc_signaling.h",
+  ]
+  deps = [
+    ":signaling_grpc_proto",
+    ":signaling_interface",
+    "../../api:libjingle_peerconnection_api",
+    "../../rtc_base:threading",
+    "//third_party/grpc:grpc++",
+  ]
+
+  defines = [ "GPR_FORBID_UNREACHABLE_CODE=0" ]
+}
+
+rtc_executable("data_channel_benchmark") {
+  testonly = true
+  sources = [
+    "data_channel_benchmark.cc",
+    "peer_connection_client.cc",
+    "peer_connection_client.h",
+  ]
+  deps = [
+    ":grpc_signaling",
+    ":signaling_interface",
+    "../../api:create_peerconnection_factory",
+    "../../api:libjingle_peerconnection_api",
+    "../../api:rtc_error",
+    "../../api:scoped_refptr",
+    "../../api/audio_codecs:builtin_audio_decoder_factory",
+    "../../api/audio_codecs:builtin_audio_encoder_factory",
+    "../../api/video_codecs:builtin_video_decoder_factory",
+    "../../api/video_codecs:builtin_video_encoder_factory",
+    "../../rtc_base",
+    "../../rtc_base:logging",
+    "../../rtc_base:refcount",
+    "../../rtc_base:threading",
+    "../../system_wrappers:field_trial",
+    "//third_party/abseil-cpp/absl/cleanup:cleanup",
+    "//third_party/abseil-cpp/absl/flags:flag",
+    "//third_party/abseil-cpp/absl/flags:parse",
+  ]
+}
diff --git a/rtc_tools/data_channel_benchmark/data_channel_benchmark.cc b/rtc_tools/data_channel_benchmark/data_channel_benchmark.cc
new file mode 100644
index 0000000..57ee8e5
--- /dev/null
+++ b/rtc_tools/data_channel_benchmark/data_channel_benchmark.cc
@@ -0,0 +1,323 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ *
+ *  Data Channel Benchmarking tool.
+ *
+ *  Create a server using: ./data_channel_benchmark --server --port 12345
+ *  Start the flow of data from the server to a client using:
+ *  ./data_channel_benchmark --port 12345 --transfer_size 100 --packet_size 8196
+ *  The throughput is reported on the server console.
+ *
+ *  The negotiation does not require a 3rd party server and is done over a gRPC
+ *  transport. No TURN server is configured, so both peers need to be reachable
+ *  using STUN only.
+ */
+#include <inttypes.h>
+
+#include <charconv>
+
+#include "absl/cleanup/cleanup.h"
+#include "absl/flags/flag.h"
+#include "absl/flags/parse.h"
+#include "rtc_base/event.h"
+#include "rtc_base/ssl_adapter.h"
+#include "rtc_base/thread.h"
+#include "rtc_tools/data_channel_benchmark/grpc_signaling.h"
+#include "rtc_tools/data_channel_benchmark/peer_connection_client.h"
+#include "system_wrappers/include/field_trial.h"
+
+ABSL_FLAG(int, verbose, 0, "verbosity level (0-5)");
+ABSL_FLAG(bool, server, false, "Server mode");
+ABSL_FLAG(bool, oneshot, true, "Terminate after serving a client");
+ABSL_FLAG(std::string, address, "localhost", "Connect to server address");
+ABSL_FLAG(uint16_t, port, 0, "Connect to port (0 for random)");
+ABSL_FLAG(uint64_t, transfer_size, 2, "Transfer size (MiB)");
+ABSL_FLAG(uint64_t, packet_size, 256 * 1024, "Packet size");
+ABSL_FLAG(std::string,
+          force_fieldtrials,
+          "",
+          "Field trials control experimental feature code which can be forced. "
+          "E.g. running with --force_fieldtrials=WebRTC-FooFeature/Enable/"
+          " will assign the group Enable to field trial WebRTC-FooFeature.");
+
+struct SetupMessage {
+  size_t packet_size;
+  size_t transfer_size;
+
+  std::string ToString() {
+    char buffer[64];
+    rtc::SimpleStringBuilder sb(buffer);
+    sb << packet_size << "," << transfer_size;
+
+    return sb.str();
+  }
+
+  static SetupMessage FromString(absl::string_view sv) {
+    SetupMessage result;
+    auto parameters = rtc::split(sv, ',');
+    std::from_chars(parameters[0].data(),
+                    parameters[0].data() + parameters[0].size(),
+                    result.packet_size, 10);
+    std::from_chars(parameters[1].data(),
+                    parameters[1].data() + parameters[1].size(),
+                    result.transfer_size, 10);
+    return result;
+  }
+};
+
+class DataChannelObserverImpl : public webrtc::DataChannelObserver {
+ public:
+  explicit DataChannelObserverImpl(webrtc::DataChannelInterface* dc)
+      : dc_(dc), bytes_received_(0) {}
+  void OnStateChange() override {
+    RTC_LOG(LS_INFO) << "State changed to " << dc_->state();
+    switch (dc_->state()) {
+      case webrtc::DataChannelInterface::DataState::kOpen:
+        open_event_.Set();
+        break;
+      case webrtc::DataChannelInterface::DataState::kClosed:
+        closed_event_.Set();
+        break;
+      default:
+        break;
+    }
+  }
+  void OnMessage(const webrtc::DataBuffer& buffer) override {
+    bytes_received_ += buffer.data.size();
+    if (bytes_received_threshold_ &&
+        bytes_received_ >= bytes_received_threshold_) {
+      bytes_received_event_.Set();
+    }
+
+    if (setup_message_.empty() && !buffer.binary) {
+      setup_message_.assign(buffer.data.cdata<char>(), buffer.data.size());
+      setup_message_event_.Set();
+    }
+  }
+  void OnBufferedAmountChange(uint64_t sent_data_size) override {
+    if (dc_->buffered_amount() <
+        webrtc::DataChannelInterface::MaxSendQueueSize() / 2)
+      low_buffered_threshold_event_.Set();
+    else
+      low_buffered_threshold_event_.Reset();
+  }
+
+  bool WaitForOpenState(int duration_ms) {
+    return dc_->state() == webrtc::DataChannelInterface::DataState::kOpen ||
+           open_event_.Wait(duration_ms);
+  }
+  bool WaitForClosedState(int duration_ms) {
+    return dc_->state() == webrtc::DataChannelInterface::DataState::kClosed ||
+           closed_event_.Wait(duration_ms);
+  }
+
+  // Set how many received bytes are required until
+  // WaitForBytesReceivedThreshold return true.
+  void SetBytesReceivedThreshold(uint64_t bytes_received_threshold) {
+    bytes_received_threshold_ = bytes_received_threshold;
+    if (bytes_received_ >= bytes_received_threshold_)
+      bytes_received_event_.Set();
+  }
+  // Wait until the received byte count reaches the desired value.
+  bool WaitForBytesReceivedThreshold(int duration_ms) {
+    return (bytes_received_threshold_ &&
+            bytes_received_ >= bytes_received_threshold_) ||
+           bytes_received_event_.Wait(duration_ms);
+  }
+
+  bool WaitForLowbufferedThreshold(int duration_ms) {
+    return low_buffered_threshold_event_.Wait(duration_ms);
+  }
+  std::string SetupMessage() { return setup_message_; }
+  bool WaitForSetupMessage(int duration_ms) {
+    return setup_message_event_.Wait(duration_ms);
+  }
+
+ private:
+  webrtc::DataChannelInterface* dc_;
+  rtc::Event open_event_;
+  rtc::Event closed_event_;
+  rtc::Event bytes_received_event_;
+  absl::optional<uint64_t> bytes_received_threshold_;
+  uint64_t bytes_received_;
+  rtc::Event low_buffered_threshold_event_;
+  std::string setup_message_;
+  rtc::Event setup_message_event_;
+};
+
+int RunServer() {
+  bool oneshot = absl::GetFlag(FLAGS_oneshot);
+  uint16_t port = absl::GetFlag(FLAGS_port);
+
+  auto signaling_thread = rtc::Thread::Create();
+  signaling_thread->Start();
+  {
+    auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory(
+        signaling_thread.get());
+
+    auto grpc_server = webrtc::GrpcSignalingServerInterface::Create(
+        [factory = rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>(
+             factory)](webrtc::SignalingInterface* signaling) {
+          webrtc::PeerConnectionClient client(factory.get(), signaling);
+          client.StartPeerConnection();
+          auto peer_connection = client.peerConnection();
+
+          // Set up the data channel
+          auto dc_or_error =
+              peer_connection->CreateDataChannelOrError("benchmark", nullptr);
+          auto data_channel = dc_or_error.MoveValue();
+          auto data_channel_observer =
+              std::make_unique<DataChannelObserverImpl>(data_channel);
+          data_channel->RegisterObserver(data_channel_observer.get());
+          absl::Cleanup unregister_observer(
+              [data_channel] { data_channel->UnregisterObserver(); });
+
+          // Wait for a first message from the remote peer.
+          // It configures how much data should be sent and how big the packets
+          // should be.
+          // First message is "packet_size,transfer_size".
+          data_channel_observer->WaitForSetupMessage(rtc::Event::kForever);
+          auto parameters =
+              SetupMessage::FromString(data_channel_observer->SetupMessage());
+
+          // Wait for the sender and receiver peers to stabilize (send all ACKs)
+          // This makes it easier to isolate the sending part when profiling.
+          absl::SleepFor(absl::Seconds(1));
+
+          std::string data(parameters.packet_size, '0');
+          size_t remaining_data = parameters.transfer_size;
+
+          auto begin_time = webrtc::Clock::GetRealTimeClock()->CurrentTime();
+
+          while (remaining_data) {
+            if (remaining_data < data.size())
+              data.resize(remaining_data);
+
+            rtc::CopyOnWriteBuffer buffer(data);
+            webrtc::DataBuffer data_buffer(buffer, true);
+            if (!data_channel->Send(data_buffer)) {
+              // If the send() call failed, the buffers are full.
+              // We wait until there's more room.
+              data_channel_observer->WaitForLowbufferedThreshold(
+                  rtc::Event::kForever);
+              continue;
+            }
+            remaining_data -= buffer.size();
+            fprintf(stderr, "Progress: %zu / %zu (%zu%%)\n",
+                    (parameters.transfer_size - remaining_data),
+                    parameters.transfer_size,
+                    (100 - remaining_data * 100 / parameters.transfer_size));
+          }
+
+          // Receiver signals the data channel close event when it has received
+          // all the data it requested.
+          data_channel_observer->WaitForClosedState(rtc::Event::kForever);
+
+          auto end_time = webrtc::Clock::GetRealTimeClock()->CurrentTime();
+          auto duration_ms = (end_time - begin_time).ms<size_t>();
+          double throughput = (parameters.transfer_size / 1024. / 1024.) /
+                              (duration_ms / 1000.);
+          printf("Elapsed time: %zums %gMiB/s\n", duration_ms, throughput);
+        },
+        port, oneshot);
+    grpc_server->Start();
+
+    printf("Server listening on port %d\n", grpc_server->SelectedPort());
+    grpc_server->Wait();
+  }
+
+  signaling_thread->Quit();
+  return 0;
+}
+
+int RunClient() {
+  uint16_t port = absl::GetFlag(FLAGS_port);
+  std::string server_address = absl::GetFlag(FLAGS_address);
+  size_t transfer_size = absl::GetFlag(FLAGS_transfer_size) * 1024 * 1024;
+  size_t packet_size = absl::GetFlag(FLAGS_packet_size);
+
+  auto signaling_thread = rtc::Thread::Create();
+  signaling_thread->Start();
+  {
+    auto factory = webrtc::PeerConnectionClient::CreateDefaultFactory(
+        signaling_thread.get());
+    auto grpc_client = webrtc::GrpcSignalingClientInterface::Create(
+        server_address + ":" + std::to_string(port));
+    webrtc::PeerConnectionClient client(factory.get(),
+                                        grpc_client->signaling_client());
+
+    // Set up the callback to receive the data channel from the sender.
+    rtc::scoped_refptr<webrtc::DataChannelInterface> data_channel;
+    rtc::Event got_data_channel;
+    client.SetOnDataChannel(
+        [&data_channel, &got_data_channel](
+            rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
+          data_channel = channel;
+          got_data_channel.Set();
+        });
+
+    // Connect to the server.
+    if (!grpc_client->Start()) {
+      fprintf(stderr, "Failed to connect to server\n");
+      return 1;
+    }
+
+    // Wait for the data channel to be received
+    got_data_channel.Wait(rtc::Event::kForever);
+
+    // DataChannel needs an observer to start draining the read queue
+    DataChannelObserverImpl observer(data_channel.get());
+    observer.SetBytesReceivedThreshold(transfer_size);
+    data_channel->RegisterObserver(&observer);
+    absl::Cleanup unregister_observer(
+        [data_channel] { data_channel->UnregisterObserver(); });
+
+    // Send a configuration string to the server to tell it to send
+    // 'packet_size' bytes packets and send a total of 'transfer_size' MB.
+    observer.WaitForOpenState(rtc::Event::kForever);
+    SetupMessage setup_message = {
+        .packet_size = packet_size,
+        .transfer_size = transfer_size,
+    };
+    if (!data_channel->Send(webrtc::DataBuffer(setup_message.ToString()))) {
+      fprintf(stderr, "Failed to send parameter string\n");
+      return 1;
+    }
+
+    // Wait until we have received all the data
+    observer.WaitForBytesReceivedThreshold(rtc::Event::kForever);
+
+    // Close the data channel, signaling to the server we have received
+    // all the requested data.
+    data_channel->Close();
+  }
+
+  signaling_thread->Quit();
+
+  return 0;
+}
+
+int main(int argc, char** argv) {
+  rtc::InitializeSSL();
+  absl::ParseCommandLine(argc, argv);
+
+  // Make sure that higher severity number means more logs by reversing the
+  // rtc::LoggingSeverity values.
+  auto logging_severity =
+      std::max(0, rtc::LS_NONE - absl::GetFlag(FLAGS_verbose));
+  rtc::LogMessage::LogToDebug(
+      static_cast<rtc::LoggingSeverity>(logging_severity));
+
+  bool is_server = absl::GetFlag(FLAGS_server);
+  std::string field_trials = absl::GetFlag(FLAGS_force_fieldtrials);
+
+  webrtc::field_trial::InitFieldTrialsFromString(field_trials.c_str());
+
+  return is_server ? RunServer() : RunClient();
+}
diff --git a/rtc_tools/data_channel_benchmark/grpc_signaling.cc b/rtc_tools/data_channel_benchmark/grpc_signaling.cc
new file mode 100644
index 0000000..8db717f
--- /dev/null
+++ b/rtc_tools/data_channel_benchmark/grpc_signaling.cc
@@ -0,0 +1,267 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "rtc_tools/data_channel_benchmark/grpc_signaling.h"
+
+#include <grpc/support/log.h>
+#include <grpcpp/grpcpp.h>
+
+#include <string>
+#include <utility>
+
+#include "api/jsep.h"
+#include "api/jsep_ice_candidate.h"
+#include "rtc_base/thread.h"
+#include "rtc_tools/data_channel_benchmark/peer_connection_signaling.grpc.pb.h"
+
+namespace webrtc {
+namespace {
+
+using GrpcSignaling::IceCandidate;
+using GrpcSignaling::PeerConnectionSignaling;
+using GrpcSignaling::SessionDescription;
+using GrpcSignaling::SignalingMessage;
+
+template <class T>
+class SessionData : public webrtc::SignalingInterface {
+ public:
+  SessionData() {}
+  explicit SessionData(T* stream) : stream_(stream) {}
+  void SetStream(T* stream) { stream_ = stream; }
+
+  void SendIceCandidate(const IceCandidateInterface* candidate) override {
+    RTC_LOG(LS_INFO) << "SendIceCandidate";
+    std::string serialized_candidate;
+    if (!candidate->ToString(&serialized_candidate)) {
+      RTC_LOG(LS_ERROR) << "Failed to serialize ICE candidate";
+      return;
+    }
+
+    SignalingMessage message;
+    IceCandidate* proto_candidate = message.mutable_candidate();
+    proto_candidate->set_description(serialized_candidate);
+    proto_candidate->set_mid(candidate->sdp_mid());
+    proto_candidate->set_mline_index(candidate->sdp_mline_index());
+
+    stream_->Write(message);
+  }
+
+  void SendDescription(const SessionDescriptionInterface* sdp) override {
+    RTC_LOG(LS_INFO) << "SendDescription";
+
+    std::string serialized_sdp;
+    sdp->ToString(&serialized_sdp);
+
+    SignalingMessage message;
+    if (sdp->GetType() == SdpType::kOffer)
+      message.mutable_description()->set_type(SessionDescription::OFFER);
+    else if (sdp->GetType() == SdpType::kAnswer)
+      message.mutable_description()->set_type(SessionDescription::ANSWER);
+    message.mutable_description()->set_content(serialized_sdp);
+
+    stream_->Write(message);
+  }
+
+  void OnRemoteDescription(
+      std::function<void(std::unique_ptr<SessionDescriptionInterface> sdp)>
+          callback) override {
+    RTC_LOG(LS_INFO) << "OnRemoteDescription";
+    remote_description_callback_ = callback;
+  }
+
+  void OnIceCandidate(
+      std::function<void(std::unique_ptr<IceCandidateInterface> candidate)>
+          callback) override {
+    RTC_LOG(LS_INFO) << "OnIceCandidate";
+    ice_candidate_callback_ = callback;
+  }
+
+  T* stream_;
+
+  std::function<void(std::unique_ptr<webrtc::IceCandidateInterface>)>
+      ice_candidate_callback_;
+  std::function<void(std::unique_ptr<webrtc::SessionDescriptionInterface>)>
+      remote_description_callback_;
+};
+
+using ServerSessionData =
+    SessionData<grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>>;
+using ClientSessionData =
+    SessionData<grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>;
+
+template <class MessageType, class StreamReader, class SessionData>
+void ProcessMessages(StreamReader* stream, SessionData* session) {
+  MessageType message;
+
+  while (stream->Read(&message)) {
+    switch (message.Content_case()) {
+      case SignalingMessage::ContentCase::kCandidate: {
+        webrtc::SdpParseError error;
+        auto jsep_candidate = std::make_unique<webrtc::JsepIceCandidate>(
+            message.candidate().mid(), message.candidate().mline_index());
+        if (!jsep_candidate->Initialize(message.candidate().description(),
+                                        &error)) {
+          RTC_LOG(LS_ERROR) << "Failed to deserialize ICE candidate '"
+                            << message.candidate().description() << "'";
+          RTC_LOG(LS_ERROR)
+              << "Error at line " << error.line << ":" << error.description;
+          continue;
+        }
+
+        session->ice_candidate_callback_(std::move(jsep_candidate));
+        break;
+      }
+      case SignalingMessage::ContentCase::kDescription: {
+        auto& description = message.description();
+        auto content = description.content();
+
+        auto sdp = webrtc::CreateSessionDescription(
+            description.type() == SessionDescription::OFFER
+                ? webrtc::SdpType::kOffer
+                : webrtc::SdpType::kAnswer,
+            description.content());
+        session->remote_description_callback_(std::move(sdp));
+        break;
+      }
+      default:
+        RTC_DCHECK_NOTREACHED();
+    }
+  }
+}
+
+class GrpcNegotiationServer : public GrpcSignalingServerInterface,
+                              public PeerConnectionSignaling::Service {
+ public:
+  GrpcNegotiationServer(
+      std::function<void(webrtc::SignalingInterface*)> callback,
+      int port,
+      bool oneshot)
+      : connect_callback_(std::move(callback)),
+        requested_port_(port),
+        oneshot_(oneshot) {}
+  ~GrpcNegotiationServer() override {
+    Stop();
+    if (server_stop_thread_)
+      server_stop_thread_->Stop();
+  }
+
+  void Start() override {
+    std::string server_address = "[::]";
+
+    grpc::ServerBuilder builder;
+    builder.AddListeningPort(
+        server_address + ":" + std::to_string(requested_port_),
+        grpc::InsecureServerCredentials(), &selected_port_);
+    builder.RegisterService(this);
+    server_ = builder.BuildAndStart();
+  }
+
+  void Wait() override { server_->Wait(); }
+
+  void Stop() override { server_->Shutdown(); }
+
+  int SelectedPort() override { return selected_port_; }
+
+  grpc::Status Connect(
+      grpc::ServerContext* context,
+      grpc::ServerReaderWriter<SignalingMessage, SignalingMessage>* stream)
+      override {
+    if (oneshot_) {
+      // Request the termination of the server early so we don't serve another
+      // client in parallel.
+      server_stop_thread_ = rtc::Thread::Create();
+      server_stop_thread_->Start();
+      server_stop_thread_->PostTask([this] { Stop(); });
+    }
+
+    ServerSessionData session(stream);
+
+    auto reading_thread = rtc::Thread::Create();
+    reading_thread->Start();
+    reading_thread->PostTask([&session, &stream] {
+      ProcessMessages<SignalingMessage>(stream, &session);
+    });
+
+    connect_callback_(&session);
+
+    reading_thread->Stop();
+
+    return grpc::Status::OK;
+  }
+
+ private:
+  std::function<void(webrtc::SignalingInterface*)> connect_callback_;
+  int requested_port_;
+  int selected_port_;
+  bool oneshot_;
+
+  std::unique_ptr<grpc::Server> server_;
+  std::unique_ptr<rtc::Thread> server_stop_thread_;
+};
+
+class GrpcNegotiationClient : public GrpcSignalingClientInterface {
+ public:
+  explicit GrpcNegotiationClient(const std::string& server) {
+    channel_ = grpc::CreateChannel(server, grpc::InsecureChannelCredentials());
+    stub_ = PeerConnectionSignaling::NewStub(channel_);
+  }
+
+  ~GrpcNegotiationClient() override {
+    context_.TryCancel();
+    if (reading_thread_)
+      reading_thread_->Stop();
+  }
+
+  bool Start() override {
+    if (!channel_->WaitForConnected(
+            absl::ToChronoTime(absl::Now() + absl::Seconds(3)))) {
+      return false;
+    }
+
+    stream_ = stub_->Connect(&context_);
+    session_.SetStream(stream_.get());
+
+    reading_thread_ = rtc::Thread::Create();
+    reading_thread_->Start();
+    reading_thread_->PostTask([this] {
+      ProcessMessages<SignalingMessage>(stream_.get(), &session_);
+    });
+
+    return true;
+  }
+
+  webrtc::SignalingInterface* signaling_client() override { return &session_; }
+
+ private:
+  std::shared_ptr<grpc::Channel> channel_;
+  std::unique_ptr<PeerConnectionSignaling::Stub> stub_;
+  std::unique_ptr<rtc::Thread> reading_thread_;
+  grpc::ClientContext context_;
+  std::unique_ptr<
+      ::grpc::ClientReaderWriter<SignalingMessage, SignalingMessage>>
+      stream_;
+  ClientSessionData session_;
+};
+}  // namespace
+
+std::unique_ptr<GrpcSignalingServerInterface>
+GrpcSignalingServerInterface::Create(
+    std::function<void(webrtc::SignalingInterface*)> callback,
+    int port,
+    bool oneshot) {
+  return std::make_unique<GrpcNegotiationServer>(std::move(callback), port,
+                                                 oneshot);
+}
+
+std::unique_ptr<GrpcSignalingClientInterface>
+GrpcSignalingClientInterface::Create(const std::string& server) {
+  return std::make_unique<GrpcNegotiationClient>(server);
+}
+
+}  // namespace webrtc
diff --git a/rtc_tools/data_channel_benchmark/grpc_signaling.h b/rtc_tools/data_channel_benchmark/grpc_signaling.h
new file mode 100644
index 0000000..15799d2
--- /dev/null
+++ b/rtc_tools/data_channel_benchmark/grpc_signaling.h
@@ -0,0 +1,64 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef RTC_TOOLS_DATA_CHANNEL_BENCHMARK_GRPC_SIGNALING_H_
+#define RTC_TOOLS_DATA_CHANNEL_BENCHMARK_GRPC_SIGNALING_H_
+
+#include <memory>
+#include <string>
+
+#include "api/jsep.h"
+#include "rtc_tools/data_channel_benchmark/signaling_interface.h"
+
+namespace webrtc {
+
+// This class defines a server enabling clients to perform a PeerConnection
+// negotiation directly over gRPC.
+// When a client connects, a callback is run to handle the request.
+class GrpcSignalingServerInterface {
+ public:
+  virtual ~GrpcSignalingServerInterface() = default;
+
+  // Start listening for connections.
+  virtual void Start() = 0;
+
+  // Wait for the gRPC server to terminate.
+  virtual void Wait() = 0;
+
+  // Stop the gRPC server instance.
+  virtual void Stop() = 0;
+
+  // The port the server is listening on.
+  virtual int SelectedPort() = 0;
+
+  // Create a gRPC server listening on |port| that will run |callback| on each
+  // request. If |oneshot| is true, it will terminate after serving one request.
+  static std::unique_ptr<GrpcSignalingServerInterface> Create(
+      std::function<void(webrtc::SignalingInterface*)> callback,
+      int port,
+      bool oneshot);
+};
+
+// This class defines a client that can connect to a server and perform a
+// PeerConnection negotiation directly over gRPC.
+class GrpcSignalingClientInterface {
+ public:
+  virtual ~GrpcSignalingClientInterface() = default;
+
+  // Connect the client to the gRPC server.
+  virtual bool Start() = 0;
+  virtual webrtc::SignalingInterface* signaling_client() = 0;
+
+  // Create a client to connnect to a server at |server_address|.
+  static std::unique_ptr<GrpcSignalingClientInterface> Create(
+      const std::string& server_address);
+};
+
+}  // namespace webrtc
+#endif  // RTC_TOOLS_DATA_CHANNEL_BENCHMARK_GRPC_SIGNALING_H_
diff --git a/rtc_tools/data_channel_benchmark/peer_connection_client.cc b/rtc_tools/data_channel_benchmark/peer_connection_client.cc
new file mode 100644
index 0000000..6d2ee81
--- /dev/null
+++ b/rtc_tools/data_channel_benchmark/peer_connection_client.cc
@@ -0,0 +1,301 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#include "rtc_tools/data_channel_benchmark/peer_connection_client.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "api/audio_codecs/builtin_audio_decoder_factory.h"
+#include "api/audio_codecs/builtin_audio_encoder_factory.h"
+#include "api/create_peerconnection_factory.h"
+#include "api/jsep.h"
+#include "api/peer_connection_interface.h"
+#include "api/rtc_error.h"
+#include "api/scoped_refptr.h"
+#include "api/set_remote_description_observer_interface.h"
+#include "api/video_codecs/builtin_video_decoder_factory.h"
+#include "api/video_codecs/builtin_video_encoder_factory.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/thread.h"
+
+namespace {
+
+constexpr char kStunServer[] = "stun:stun.l.google.com:19302";
+
+class SetLocalDescriptionObserverAdapter
+    : public webrtc::SetLocalDescriptionObserverInterface {
+ public:
+  using Callback = std::function<void(webrtc::RTCError)>;
+  static rtc::scoped_refptr<SetLocalDescriptionObserverAdapter> Create(
+      Callback callback) {
+    return rtc::scoped_refptr<SetLocalDescriptionObserverAdapter>(
+        new rtc::RefCountedObject<SetLocalDescriptionObserverAdapter>(
+            std::move(callback)));
+  }
+
+  explicit SetLocalDescriptionObserverAdapter(Callback callback)
+      : callback_(std::move(callback)) {}
+  ~SetLocalDescriptionObserverAdapter() override = default;
+
+ private:
+  void OnSetLocalDescriptionComplete(webrtc::RTCError error) override {
+    callback_(std::move(error));
+  }
+
+  Callback callback_;
+};
+
+class SetRemoteDescriptionObserverAdapter
+    : public webrtc::SetRemoteDescriptionObserverInterface {
+ public:
+  using Callback = std::function<void(webrtc::RTCError)>;
+  static rtc::scoped_refptr<SetRemoteDescriptionObserverAdapter> Create(
+      Callback callback) {
+    return rtc::scoped_refptr<SetRemoteDescriptionObserverAdapter>(
+        new rtc::RefCountedObject<SetRemoteDescriptionObserverAdapter>(
+            std::move(callback)));
+  }
+
+  explicit SetRemoteDescriptionObserverAdapter(Callback callback)
+      : callback_(std::move(callback)) {}
+  ~SetRemoteDescriptionObserverAdapter() override = default;
+
+ private:
+  void OnSetRemoteDescriptionComplete(webrtc::RTCError error) override {
+    callback_(std::move(error));
+  }
+
+  Callback callback_;
+};
+
+class CreateSessionDescriptionObserverAdapter
+    : public webrtc::CreateSessionDescriptionObserver {
+ public:
+  using Success = std::function<void(webrtc::SessionDescriptionInterface*)>;
+  using Failure = std::function<void(webrtc::RTCError)>;
+
+  static rtc::scoped_refptr<CreateSessionDescriptionObserverAdapter> Create(
+      Success success,
+      Failure failure) {
+    return rtc::scoped_refptr<CreateSessionDescriptionObserverAdapter>(
+        new rtc::RefCountedObject<CreateSessionDescriptionObserverAdapter>(
+            std::move(success), std::move(failure)));
+  }
+
+  CreateSessionDescriptionObserverAdapter(Success success, Failure failure)
+      : success_(std::move(success)), failure_(std::move(failure)) {}
+  ~CreateSessionDescriptionObserverAdapter() override = default;
+
+ private:
+  void OnSuccess(webrtc::SessionDescriptionInterface* desc) override {
+    success_(desc);
+  }
+
+  void OnFailure(webrtc::RTCError error) override {
+    failure_(std::move(error));
+  }
+
+  Success success_;
+  Failure failure_;
+};
+
+}  // namespace
+
+namespace webrtc {
+
+PeerConnectionClient::PeerConnectionClient(
+    webrtc::PeerConnectionFactoryInterface* factory,
+    webrtc::SignalingInterface* signaling)
+    : signaling_(signaling) {
+  signaling_->OnIceCandidate(
+      [&](std::unique_ptr<webrtc::IceCandidateInterface> candidate) {
+        AddIceCandidate(std::move(candidate));
+      });
+  signaling_->OnRemoteDescription(
+      [&](std::unique_ptr<webrtc::SessionDescriptionInterface> sdp) {
+        SetRemoteDescription(std::move(sdp));
+      });
+  InitializePeerConnection(factory);
+}
+
+PeerConnectionClient::~PeerConnectionClient() {
+  Disconnect();
+}
+
+rtc::scoped_refptr<PeerConnectionFactoryInterface>
+PeerConnectionClient::CreateDefaultFactory(rtc::Thread* signaling_thread) {
+  auto factory = webrtc::CreatePeerConnectionFactory(
+      /*network_thread=*/nullptr, /*worker_thread=*/nullptr,
+      /*signaling_thread*/ signaling_thread,
+      /*default_adm=*/nullptr, webrtc::CreateBuiltinAudioEncoderFactory(),
+      webrtc::CreateBuiltinAudioDecoderFactory(),
+      webrtc::CreateBuiltinVideoEncoderFactory(),
+      webrtc::CreateBuiltinVideoDecoderFactory(),
+      /*audio_mixer=*/nullptr, /*audio_processing=*/nullptr);
+
+  if (!factory) {
+    RTC_LOG(LS_ERROR) << "Failed to initialize PeerConnectionFactory";
+    return nullptr;
+  }
+
+  return factory;
+}
+
+bool PeerConnectionClient::InitializePeerConnection(
+    webrtc::PeerConnectionFactoryInterface* factory) {
+  RTC_CHECK(factory)
+      << "Must call InitializeFactory before InitializePeerConnection";
+
+  webrtc::PeerConnectionInterface::RTCConfiguration config;
+  webrtc::PeerConnectionInterface::IceServer server;
+  server.urls.push_back(kStunServer);
+  config.servers.push_back(server);
+  config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
+  config.enable_dtls_srtp = true;
+
+  webrtc::PeerConnectionDependencies dependencies(this);
+  auto result =
+      factory->CreatePeerConnectionOrError(config, std::move(dependencies));
+
+  if (!result.ok()) {
+    RTC_LOG(LS_ERROR) << "Failed to create PeerConnection: "
+                      << result.error().message();
+    DeletePeerConnection();
+    return false;
+  }
+  peer_connection_ = result.MoveValue();
+  RTC_LOG(LS_INFO) << "PeerConnection created successfully";
+  return true;
+}
+
+bool PeerConnectionClient::StartPeerConnection() {
+  RTC_LOG(LS_INFO) << "Creating offer";
+
+  peer_connection_->SetLocalDescription(
+      SetLocalDescriptionObserverAdapter::Create([this](
+                                                     webrtc::RTCError error) {
+        if (error.ok())
+          signaling_->SendDescription(peer_connection_->local_description());
+      }));
+
+  return true;
+}
+
+bool PeerConnectionClient::IsConnected() {
+  return peer_connection_->peer_connection_state() ==
+         webrtc::PeerConnectionInterface::PeerConnectionState::kConnected;
+}
+
+// Disconnect from the call.
+void PeerConnectionClient::Disconnect() {
+  for (auto& data_channel : data_channels_) {
+    data_channel->Close();
+    data_channel.release();
+  }
+  data_channels_.clear();
+  DeletePeerConnection();
+}
+
+// Delete the WebRTC PeerConnection.
+void PeerConnectionClient::DeletePeerConnection() {
+  RTC_LOG(LS_INFO);
+
+  if (peer_connection_) {
+    peer_connection_->Close();
+  }
+  peer_connection_.release();
+}
+
+void PeerConnectionClient::OnIceConnectionChange(
+    webrtc::PeerConnectionInterface::IceConnectionState new_state) {
+  if (new_state == webrtc::PeerConnectionInterface::IceConnectionState::
+                       kIceConnectionCompleted) {
+    RTC_LOG(LS_INFO) << "State is updating to connected";
+  } else if (new_state == webrtc::PeerConnectionInterface::IceConnectionState::
+                              kIceConnectionDisconnected) {
+    RTC_LOG(LS_INFO) << "Disconnecting from peer";
+    Disconnect();
+  }
+}
+
+void PeerConnectionClient::OnIceGatheringChange(
+    webrtc::PeerConnectionInterface::IceGatheringState new_state) {
+  if (new_state == webrtc::PeerConnectionInterface::kIceGatheringComplete) {
+    RTC_LOG(LS_INFO) << "Client is ready to receive remote SDP";
+  }
+}
+
+void PeerConnectionClient::OnIceCandidate(
+    const webrtc::IceCandidateInterface* candidate) {
+  signaling_->SendIceCandidate(candidate);
+}
+
+void PeerConnectionClient::OnDataChannel(
+    rtc::scoped_refptr<webrtc::DataChannelInterface> channel) {
+  RTC_LOG(LS_INFO) << __FUNCTION__ << " remote datachannel created";
+  if (on_data_channel_callback_)
+    on_data_channel_callback_(channel);
+  data_channels_.push_back(channel);
+}
+
+void PeerConnectionClient::SetOnDataChannel(
+    std::function<void(rtc::scoped_refptr<webrtc::DataChannelInterface>)>
+        callback) {
+  on_data_channel_callback_ = callback;
+}
+
+void PeerConnectionClient::OnNegotiationNeededEvent(uint32_t event_id) {
+  RTC_LOG(LS_INFO) << "OnNegotiationNeededEvent";
+
+  peer_connection_->SetLocalDescription(
+      SetLocalDescriptionObserverAdapter::Create([this](
+                                                     webrtc::RTCError error) {
+        if (error.ok())
+          signaling_->SendDescription(peer_connection_->local_description());
+      }));
+}
+
+bool PeerConnectionClient::SetRemoteDescription(
+    std::unique_ptr<webrtc::SessionDescriptionInterface> desc) {
+  RTC_LOG(LS_INFO) << "SetRemoteDescription";
+  auto type = desc->GetType();
+
+  peer_connection_->SetRemoteDescription(
+      std::move(desc),
+      SetRemoteDescriptionObserverAdapter::Create([&](webrtc::RTCError) {
+        RTC_LOG(LS_INFO) << "SetRemoteDescription done";
+
+        if (type == webrtc::SdpType::kOffer) {
+          // Got an offer from the remote, need to set an answer and send it.
+          peer_connection_->SetLocalDescription(
+              SetLocalDescriptionObserverAdapter::Create(
+                  [this](webrtc::RTCError error) {
+                    if (error.ok())
+                      signaling_->SendDescription(
+                          peer_connection_->local_description());
+                  }));
+        }
+      }));
+
+  return true;
+}
+
+void PeerConnectionClient::AddIceCandidate(
+    std::unique_ptr<webrtc::IceCandidateInterface> candidate) {
+  RTC_LOG(LS_INFO) << "AddIceCandidate";
+
+  peer_connection_->AddIceCandidate(
+      std::move(candidate), [](const webrtc::RTCError& error) {
+        RTC_LOG(LS_INFO) << "Failed to add candidate: " << error.message();
+      });
+}
+
+}  // namespace webrtc
diff --git a/rtc_tools/data_channel_benchmark/peer_connection_client.h b/rtc_tools/data_channel_benchmark/peer_connection_client.h
new file mode 100644
index 0000000..a9787fe
--- /dev/null
+++ b/rtc_tools/data_channel_benchmark/peer_connection_client.h
@@ -0,0 +1,108 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef RTC_TOOLS_DATA_CHANNEL_BENCHMARK_PEER_CONNECTION_CLIENT_H_
+#define RTC_TOOLS_DATA_CHANNEL_BENCHMARK_PEER_CONNECTION_CLIENT_H_
+
+#include <stdint.h>
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "api/jsep.h"
+#include "api/peer_connection_interface.h"
+#include "api/rtp_receiver_interface.h"
+#include "api/scoped_refptr.h"
+#include "api/set_local_description_observer_interface.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/ref_counted_object.h"
+#include "rtc_base/thread.h"
+#include "rtc_tools/data_channel_benchmark/signaling_interface.h"
+
+namespace webrtc {
+
+// Handles all the details for creating a PeerConnection and negotiation using a
+// SignalingInterface object.
+class PeerConnectionClient : public webrtc::PeerConnectionObserver {
+ public:
+  explicit PeerConnectionClient(webrtc::PeerConnectionFactoryInterface* factory,
+                                webrtc::SignalingInterface* signaling);
+
+  ~PeerConnectionClient() override;
+
+  PeerConnectionClient(const PeerConnectionClient&) = delete;
+  PeerConnectionClient& operator=(const PeerConnectionClient&) = delete;
+
+  // Set the local description and send offer using the SignalingInterface,
+  // initiating the negotiation process.
+  bool StartPeerConnection();
+
+  // Whether the peer connection is connected to the remote peer.
+  bool IsConnected();
+
+  // Disconnect from the call.
+  void Disconnect();
+
+  rtc::scoped_refptr<webrtc::PeerConnectionInterface> peerConnection() {
+    return peer_connection_;
+  }
+
+  // Set a callback to run when a DataChannel is created by the remote peer.
+  void SetOnDataChannel(
+      std::function<void(rtc::scoped_refptr<webrtc::DataChannelInterface>)>
+          callback);
+
+  std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>>&
+  dataChannels() {
+    return data_channels_;
+  }
+
+  // Creates a default PeerConnectionFactory object.
+  static rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
+  CreateDefaultFactory(rtc::Thread* signaling_thread);
+
+ private:
+  void AddIceCandidate(
+      std::unique_ptr<webrtc::IceCandidateInterface> candidate);
+  bool SetRemoteDescription(
+      std::unique_ptr<webrtc::SessionDescriptionInterface> desc);
+
+  // Initialize the PeerConnection with a given PeerConnectionFactory.
+  bool InitializePeerConnection(
+      webrtc::PeerConnectionFactoryInterface* factory);
+  void DeletePeerConnection();
+
+  // PeerConnectionObserver implementation.
+  void OnSignalingChange(
+      webrtc::PeerConnectionInterface::SignalingState new_state) override {
+    RTC_LOG(LS_INFO) << __FUNCTION__ << " new state: " << new_state;
+  }
+  void OnDataChannel(
+      rtc::scoped_refptr<webrtc::DataChannelInterface> channel) override;
+  void OnNegotiationNeededEvent(uint32_t event_id) override;
+  void OnIceConnectionChange(
+      webrtc::PeerConnectionInterface::IceConnectionState new_state) override;
+  void OnIceGatheringChange(
+      webrtc::PeerConnectionInterface::IceGatheringState new_state) override;
+  void OnIceCandidate(const webrtc::IceCandidateInterface* candidate) override;
+  void OnIceConnectionReceivingChange(bool receiving) override {
+    RTC_LOG(LS_INFO) << __FUNCTION__ << " receiving? " << receiving;
+  }
+
+  rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection_;
+  std::function<void(rtc::scoped_refptr<webrtc::DataChannelInterface>)>
+      on_data_channel_callback_;
+  std::vector<rtc::scoped_refptr<webrtc::DataChannelInterface>> data_channels_;
+  webrtc::SignalingInterface* signaling_;
+};
+
+}  // namespace webrtc
+
+#endif  // RTC_TOOLS_DATA_CHANNEL_BENCHMARK_PEER_CONNECTION_CLIENT_H_
diff --git a/rtc_tools/data_channel_benchmark/peer_connection_signaling.proto b/rtc_tools/data_channel_benchmark/peer_connection_signaling.proto
new file mode 100644
index 0000000..9bd0aae9
--- /dev/null
+++ b/rtc_tools/data_channel_benchmark/peer_connection_signaling.proto
@@ -0,0 +1,29 @@
+syntax = "proto3";
+
+package webrtc.GrpcSignaling;
+
+service PeerConnectionSignaling {
+  rpc Connect(stream SignalingMessage) returns (stream SignalingMessage) {}
+}
+
+message SignalingMessage {
+  oneof Content {
+    SessionDescription description = 1;
+    IceCandidate candidate = 2;
+  }
+}
+
+message SessionDescription {
+  enum SessionDescriptionType {
+    OFFER = 0;
+    ANSWER = 1;
+  }
+  SessionDescriptionType type = 1;
+  string content = 2;
+}
+
+message IceCandidate {
+  string mid = 1;
+  int32 mline_index = 2;
+  string description = 3;
+}
\ No newline at end of file
diff --git a/rtc_tools/data_channel_benchmark/signaling_interface.h b/rtc_tools/data_channel_benchmark/signaling_interface.h
new file mode 100644
index 0000000..77c811a
--- /dev/null
+++ b/rtc_tools/data_channel_benchmark/signaling_interface.h
@@ -0,0 +1,42 @@
+/*
+ *  Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+#ifndef RTC_TOOLS_DATA_CHANNEL_BENCHMARK_SIGNALING_INTERFACE_H_
+#define RTC_TOOLS_DATA_CHANNEL_BENCHMARK_SIGNALING_INTERFACE_H_
+
+#include <memory>
+
+#include "api/jsep.h"
+
+namespace webrtc {
+class SignalingInterface {
+ public:
+  virtual ~SignalingInterface() = default;
+
+  // Send an ICE candidate over the transport.
+  virtual void SendIceCandidate(
+      const webrtc::IceCandidateInterface* candidate) = 0;
+
+  // Send a local description over the transport.
+  virtual void SendDescription(
+      const webrtc::SessionDescriptionInterface* sdp) = 0;
+
+  // Set a callback when receiving a description from the transport.
+  virtual void OnRemoteDescription(
+      std::function<void(std::unique_ptr<webrtc::SessionDescriptionInterface>
+                             sdp)> callback) = 0;
+
+  // Set a callback when receiving an ICE candidate from the transport.
+  virtual void OnIceCandidate(
+      std::function<void(std::unique_ptr<webrtc::IceCandidateInterface>
+                             candidate)> callback) = 0;
+};
+}  // namespace webrtc
+
+#endif  // RTC_TOOLS_DATA_CHANNEL_BENCHMARK_SIGNALING_INTERFACE_H_
diff --git a/tools_webrtc/sanitizers/tsan_suppressions_webrtc.cc b/tools_webrtc/sanitizers/tsan_suppressions_webrtc.cc
index 6cbdf0b..ae42d53 100644
--- a/tools_webrtc/sanitizers/tsan_suppressions_webrtc.cc
+++ b/tools_webrtc/sanitizers/tsan_suppressions_webrtc.cc
@@ -69,6 +69,9 @@
     // http://crbug.com/244856
     "race:libpulsecommon*.so\n"
 
+    // https://crbug.com/1158622
+    "race:absl::synchronization_internal::Waiter::Post\n"
+
     // End of suppressions.
     ;  // Please keep this semicolon.
 
diff --git a/webrtc.gni b/webrtc.gni
index b81f160..c4a62fc 100644
--- a/webrtc.gni
+++ b/webrtc.gni
@@ -308,6 +308,9 @@
 
   # Enable the usrsctp backend for DataChannels and related unittests
   rtc_build_usrsctp = !build_with_mozilla && rtc_enable_sctp
+
+  # Enable gRPC used for negotiation in multiprocess tests
+  rtc_enable_grpc = rtc_enable_protobuf && (is_linux || is_mac)
 }
 
 # Make it possible to provide custom locations for some libraries (move these