Introduce network layer.

This CL contains network emulation layer and is a first part of landing
CL https://webrtc-review.googlesource.com/c/src/+/116663

Bug: webrtc:10138
Change-Id: If664b21e9df847aef8144d622d08fc7e9f6608da
Reviewed-on: https://webrtc-review.googlesource.com/c/120406
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26470}
diff --git a/BUILD.gn b/BUILD.gn
index d9c9d39..8243281 100644
--- a/BUILD.gn
+++ b/BUILD.gn
@@ -487,6 +487,7 @@
       "rtc_base:sigslot_unittest",
       "rtc_base:weak_ptr_unittests",
       "rtc_base/experiments:experiments_unittests",
+      "test/scenario/network:network_emulation_unittests",
     ]
 
     if (rtc_enable_protobuf) {
diff --git a/test/DEPS b/test/DEPS
index f2bded5..c7ae24f 100644
--- a/test/DEPS
+++ b/test/DEPS
@@ -49,4 +49,9 @@
     "+pc",
     "+p2p",
   ],
+  ".*network_emulation_pc_unittest\.cc": [
+    "+pc/peer_connection_wrapper.h",
+    "+pc/test/mock_peer_connection_observers.h",
+    "+p2p/client/basic_port_allocator.h",
+  ],
 }
diff --git a/test/scenario/network/BUILD.gn b/test/scenario/network/BUILD.gn
index 7df99ec..c73a889 100644
--- a/test/scenario/network/BUILD.gn
+++ b/test/scenario/network/BUILD.gn
@@ -9,15 +9,91 @@
 import("../../../webrtc.gni")
 
 rtc_source_set("emulated_network") {
+  testonly = true
   sources = [
+    "fake_network_socket.cc",
+    "fake_network_socket.h",
+    "fake_network_socket_server.cc",
+    "fake_network_socket_server.h",
     "network_emulation.cc",
     "network_emulation.h",
+    "network_emulation_manager.cc",
+    "network_emulation_manager.h",
   ]
   deps = [
     "../../../api:simulated_network_api",
+    "../../../api/units:data_rate",
+    "../../../api/units:data_size",
+    "../../../api/units:time_delta",
     "../../../api/units:timestamp",
     "../../../rtc_base:rtc_base",
+    "../../../rtc_base:rtc_task_queue_api",
+    "../../../rtc_base:safe_minmax",
+    "../../../rtc_base/task_utils:repeating_task",
+    "../../../rtc_base/third_party/sigslot:sigslot",
+    "../../../system_wrappers:system_wrappers",
     "//third_party/abseil-cpp/absl/memory:memory",
     "//third_party/abseil-cpp/absl/types:optional",
   ]
 }
+
+rtc_source_set("network_emulation_unittest") {
+  testonly = true
+  sources = [
+    "network_emulation_unittest.cc",
+  ]
+  deps = [
+    ":emulated_network",
+    "../../../api:simulated_network_api",
+    "../../../call:simulated_network",
+    "../../../rtc_base:logging",
+    "../../../rtc_base:rtc_event",
+    "../../../test:test_support",
+    "//third_party/abseil-cpp/absl/memory:memory",
+  ]
+}
+
+rtc_source_set("network_emulation_pc_unittest") {
+  testonly = true
+  sources = [
+    "network_emulation_pc_unittest.cc",
+  ]
+  deps = [
+    ":emulated_network",
+    "../../../api:callfactory_api",
+    "../../../api:libjingle_peerconnection_api",
+    "../../../api:scoped_refptr",
+    "../../../api:simulated_network_api",
+    "../../../api/audio_codecs:builtin_audio_decoder_factory",
+    "../../../api/audio_codecs:builtin_audio_encoder_factory",
+    "../../../api/video_codecs:builtin_video_decoder_factory",
+    "../../../api/video_codecs:builtin_video_encoder_factory",
+    "../../../call:simulated_network",
+    "../../../logging:rtc_event_log_impl_base",
+    "../../../media:rtc_audio_video",
+    "../../../modules/audio_device:audio_device_impl",
+    "../../../p2p:rtc_p2p",
+    "../../../pc:pc_test_utils",
+    "../../../pc:peerconnection_wrapper",
+    "../../../rtc_base:gunit_helpers",
+    "../../../rtc_base:logging",
+    "../../../rtc_base:rtc_base",
+    "../../../rtc_base:rtc_base_tests_utils",
+    "../../../rtc_base:rtc_event",
+    "../../../test:test_support",
+    "//third_party/abseil-cpp/absl/memory:memory",
+  ]
+
+  if (!build_with_chromium && is_clang) {
+    # Suppress warnings from the Chromium Clang plugin (bugs.webrtc.org/163).
+    suppressed_configs += [ "//build/config/clang:find_bad_constructs" ]
+  }
+}
+
+rtc_source_set("network_emulation_unittests") {
+  testonly = true
+  deps = [
+    ":network_emulation_pc_unittest",
+    ":network_emulation_unittest",
+  ]
+}
diff --git a/test/scenario/network/fake_network_socket.cc b/test/scenario/network/fake_network_socket.cc
new file mode 100644
index 0000000..5d4ad3f
--- /dev/null
+++ b/test/scenario/network/fake_network_socket.cc
@@ -0,0 +1,219 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/scenario/network/fake_network_socket.h"
+
+#include <algorithm>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "rtc_base/logging.h"
+#include "rtc_base/thread.h"
+
+namespace webrtc {
+namespace test {
+namespace {
+
+std::string ToString(const rtc::SocketAddress& addr) {
+  return addr.HostAsURIString() + ":" + std::to_string(addr.port());
+}
+
+}  // namespace
+
+FakeNetworkSocket::FakeNetworkSocket(SocketManager* socket_manager)
+    : socket_manager_(socket_manager),
+      state_(CS_CLOSED),
+      error_(0),
+      pending_read_events_count_(0) {}
+FakeNetworkSocket::~FakeNetworkSocket() {
+  Close();
+  socket_manager_->Unregister(this);
+}
+
+void FakeNetworkSocket::OnPacketReceived(EmulatedIpPacket packet) {
+  {
+    rtc::CritScope crit(&lock_);
+    packet_queue_.push_back(std::move(packet));
+    pending_read_events_count_++;
+  }
+  socket_manager_->WakeUp();
+}
+
+bool FakeNetworkSocket::ProcessIo() {
+  {
+    rtc::CritScope crit(&lock_);
+    if (pending_read_events_count_ == 0) {
+      return false;
+    }
+    pending_read_events_count_--;
+    RTC_DCHECK_GE(pending_read_events_count_, 0);
+  }
+  SignalReadEvent(this);
+  return true;
+}
+
+rtc::SocketAddress FakeNetworkSocket::GetLocalAddress() const {
+  return local_addr_;
+}
+
+rtc::SocketAddress FakeNetworkSocket::GetRemoteAddress() const {
+  return remote_addr_;
+}
+
+int FakeNetworkSocket::Bind(const rtc::SocketAddress& addr) {
+  RTC_CHECK(local_addr_.IsNil())
+      << "Socket already bound to address: " << ToString(local_addr_);
+  local_addr_ = addr;
+  endpoint_ = socket_manager_->GetEndpointNode(local_addr_.ipaddr());
+  if (!endpoint_) {
+    local_addr_.Clear();
+    RTC_LOG(INFO) << "No endpoint for address: " << ToString(addr);
+    error_ = EADDRNOTAVAIL;
+    return 2;
+  }
+  absl::optional<uint16_t> port =
+      endpoint_->BindReceiver(local_addr_.port(), this);
+  if (!port) {
+    local_addr_.Clear();
+    RTC_LOG(INFO) << "Cannot bind to in-use address: " << ToString(addr);
+    error_ = EADDRINUSE;
+    return 1;
+  }
+  local_addr_.SetPort(port.value());
+  return 0;
+}
+
+int FakeNetworkSocket::Connect(const rtc::SocketAddress& addr) {
+  RTC_CHECK(remote_addr_.IsNil())
+      << "Socket already connected to address: " << ToString(remote_addr_);
+  RTC_CHECK(!local_addr_.IsNil())
+      << "Socket have to be bind to some local address";
+  remote_addr_ = addr;
+  state_ = CS_CONNECTED;
+  return 0;
+}
+
+int FakeNetworkSocket::Send(const void* pv, size_t cb) {
+  RTC_CHECK(state_ == CS_CONNECTED) << "Socket cannot send: not connected";
+  return SendTo(pv, cb, remote_addr_);
+}
+
+int FakeNetworkSocket::SendTo(const void* pv,
+                              size_t cb,
+                              const rtc::SocketAddress& addr) {
+  RTC_CHECK(!local_addr_.IsNil())
+      << "Socket have to be bind to some local address";
+  rtc::CopyOnWriteBuffer packet(static_cast<const uint8_t*>(pv), cb);
+  endpoint_->SendPacket(local_addr_, addr, packet);
+  return cb;
+}
+
+int FakeNetworkSocket::Recv(void* pv, size_t cb, int64_t* timestamp) {
+  rtc::SocketAddress paddr;
+  return RecvFrom(pv, cb, &paddr, timestamp);
+}
+
+// Reads 1 packet from internal queue. Reads up to |cb| bytes into |pv|
+// and returns the length of received packet.
+int FakeNetworkSocket::RecvFrom(void* pv,
+                                size_t cb,
+                                rtc::SocketAddress* paddr,
+                                int64_t* timestamp) {
+  if (timestamp) {
+    *timestamp = -1;
+  }
+  absl::optional<EmulatedIpPacket> packetOpt = PopFrontPacket();
+
+  if (!packetOpt) {
+    error_ = EAGAIN;
+    return -1;
+  }
+
+  EmulatedIpPacket packet = std::move(packetOpt.value());
+  *paddr = packet.from;
+  size_t data_read = std::min(cb, packet.size());
+  memcpy(pv, packet.cdata(), data_read);
+  *timestamp = packet.arrival_time.us();
+
+  // According to RECV(2) Linux Man page
+  // real socket will discard data, that won't fit into provided buffer,
+  // but we won't to skip such error, so we will assert here.
+  RTC_CHECK(data_read == packet.size())
+      << "Too small buffer is provided for socket read. "
+      << "Received data size: " << packet.size()
+      << "; Provided buffer size: " << cb;
+
+  // According to RECV(2) Linux Man page
+  // real socket will return message length, not data read. In our case it is
+  // actually the same value.
+  return static_cast<int>(packet.size());
+}
+
+int FakeNetworkSocket::Listen(int backlog) {
+  RTC_CHECK(false) << "Listen() isn't valid for SOCK_DGRAM";
+}
+
+rtc::AsyncSocket* FakeNetworkSocket::Accept(rtc::SocketAddress* /*paddr*/) {
+  RTC_CHECK(false) << "Accept() isn't valid for SOCK_DGRAM";
+}
+
+int FakeNetworkSocket::Close() {
+  state_ = CS_CLOSED;
+  if (!local_addr_.IsNil()) {
+    endpoint_->UnbindReceiver(local_addr_.port());
+  }
+  local_addr_.Clear();
+  remote_addr_.Clear();
+  return 0;
+}
+
+int FakeNetworkSocket::GetError() const {
+  RTC_CHECK(error_ == 0);
+  return error_;
+}
+
+void FakeNetworkSocket::SetError(int error) {
+  RTC_CHECK(error == 0);
+  error_ = error;
+}
+
+rtc::AsyncSocket::ConnState FakeNetworkSocket::GetState() const {
+  return state_;
+}
+
+int FakeNetworkSocket::GetOption(Option opt, int* value) {
+  auto it = options_map_.find(opt);
+  if (it == options_map_.end()) {
+    return -1;
+  }
+  *value = it->second;
+  return 0;
+}
+
+int FakeNetworkSocket::SetOption(Option opt, int value) {
+  options_map_[opt] = value;
+  return 0;
+}
+
+absl::optional<EmulatedIpPacket> FakeNetworkSocket::PopFrontPacket() {
+  rtc::CritScope crit(&lock_);
+  if (packet_queue_.empty()) {
+    return absl::nullopt;
+  }
+
+  absl::optional<EmulatedIpPacket> packet =
+      absl::make_optional(std::move(packet_queue_.front()));
+  packet_queue_.pop_front();
+  return packet;
+}
+
+}  // namespace test
+}  // namespace webrtc
diff --git a/test/scenario/network/fake_network_socket.h b/test/scenario/network/fake_network_socket.h
new file mode 100644
index 0000000..fcd9d27
--- /dev/null
+++ b/test/scenario/network/fake_network_socket.h
@@ -0,0 +1,105 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_H_
+#define TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_H_
+
+#include <deque>
+#include <map>
+#include <vector>
+
+#include "rtc_base/async_socket.h"
+#include "rtc_base/copy_on_write_buffer.h"
+#include "rtc_base/critical_section.h"
+#include "rtc_base/socket_address.h"
+#include "test/scenario/network/network_emulation.h"
+
+namespace webrtc {
+namespace test {
+
+class SocketIoProcessor {
+ public:
+  virtual ~SocketIoProcessor() = default;
+
+  // Process single IO operation.
+  virtual bool ProcessIo() = 0;
+};
+
+class SocketManager {
+ public:
+  virtual ~SocketManager() = default;
+
+  virtual void WakeUp() = 0;
+  virtual void Unregister(SocketIoProcessor* io_processor) = 0;
+  // Provides endpoints by IP address.
+  virtual EndpointNode* GetEndpointNode(const rtc::IPAddress& ip) = 0;
+};
+
+// Represents a socket, which will operate with emulated network.
+class FakeNetworkSocket : public rtc::AsyncSocket,
+                          public EmulatedNetworkReceiverInterface,
+                          public SocketIoProcessor {
+ public:
+  explicit FakeNetworkSocket(SocketManager* scoket_manager);
+  ~FakeNetworkSocket() override;
+
+  // Will be invoked by EndpointNode to deliver packets into this socket.
+  void OnPacketReceived(EmulatedIpPacket packet) override;
+  // Will fire read event for incoming packets.
+  bool ProcessIo() override;
+
+  // rtc::Socket methods:
+  rtc::SocketAddress GetLocalAddress() const override;
+  rtc::SocketAddress GetRemoteAddress() const override;
+  int Bind(const rtc::SocketAddress& addr) override;
+  int Connect(const rtc::SocketAddress& addr) override;
+  int Close() override;
+  int Send(const void* pv, size_t cb) override;
+  int SendTo(const void* pv,
+             size_t cb,
+             const rtc::SocketAddress& addr) override;
+  int Recv(void* pv, size_t cb, int64_t* timestamp) override;
+  int RecvFrom(void* pv,
+               size_t cb,
+               rtc::SocketAddress* paddr,
+               int64_t* timestamp) override;
+  int Listen(int backlog) override;
+  rtc::AsyncSocket* Accept(rtc::SocketAddress* paddr) override;
+  int GetError() const override;
+  void SetError(int error) override;
+  ConnState GetState() const override;
+  int GetOption(Option opt, int* value) override;
+  int SetOption(Option opt, int value) override;
+
+ private:
+  absl::optional<EmulatedIpPacket> PopFrontPacket();
+
+  SocketManager* const socket_manager_;
+  EndpointNode* endpoint_;
+
+  rtc::SocketAddress local_addr_;
+  rtc::SocketAddress remote_addr_;
+  ConnState state_;
+  int error_;
+  std::map<Option, int> options_map_;
+
+  rtc::CriticalSection lock_;
+  // Count of packets in the queue for which we didn't fire read event.
+  // |pending_read_events_count_| can be different from |packet_queue_.size()|
+  // because read events will be fired by one thread and packets in the queue
+  // can be processed by another thread.
+  int pending_read_events_count_;
+  std::deque<EmulatedIpPacket> packet_queue_ RTC_GUARDED_BY(lock_);
+};
+
+}  // namespace test
+}  // namespace webrtc
+
+#endif  // TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_H_
diff --git a/test/scenario/network/fake_network_socket_server.cc b/test/scenario/network/fake_network_socket_server.cc
new file mode 100644
index 0000000..b7d1fc4
--- /dev/null
+++ b/test/scenario/network/fake_network_socket_server.cc
@@ -0,0 +1,98 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/scenario/network/fake_network_socket_server.h"
+
+#include <utility>
+
+namespace webrtc {
+namespace test {
+
+FakeNetworkSocketServer::FakeNetworkSocketServer(
+    Clock* clock,
+    std::vector<EndpointNode*> endpoints)
+    : clock_(clock),
+      endpoints_(std::move(endpoints)),
+      wakeup_(/*manual_reset=*/false, /*initially_signaled=*/false) {}
+FakeNetworkSocketServer::~FakeNetworkSocketServer() = default;
+
+void FakeNetworkSocketServer::OnMessageQueueDestroyed() {
+  msg_queue_ = nullptr;
+}
+
+EndpointNode* FakeNetworkSocketServer::GetEndpointNode(
+    const rtc::IPAddress& ip) {
+  for (auto* endpoint : endpoints_) {
+    rtc::IPAddress peerLocalAddress = endpoint->GetPeerLocalAddress();
+    if (peerLocalAddress == ip) {
+      return endpoint;
+    }
+  }
+  RTC_CHECK(false) << "No network found for address" << ip.ToString();
+}
+
+void FakeNetworkSocketServer::Unregister(SocketIoProcessor* io_processor) {
+  rtc::CritScope crit(&lock_);
+  io_processors_.erase(io_processor);
+}
+
+rtc::Socket* FakeNetworkSocketServer::CreateSocket(int /*family*/,
+                                                   int /*type*/) {
+  RTC_CHECK(false) << "Only async sockets are supported";
+}
+
+rtc::AsyncSocket* FakeNetworkSocketServer::CreateAsyncSocket(int family,
+                                                             int type) {
+  RTC_DCHECK(family == AF_INET || family == AF_INET6);
+  // We support only UDP sockets for now.
+  RTC_DCHECK(type == SOCK_DGRAM) << "Only UDP sockets are supported";
+  FakeNetworkSocket* out = new FakeNetworkSocket(this);
+  {
+    rtc::CritScope crit(&lock_);
+    io_processors_.insert(out);
+  }
+  return out;
+}
+
+void FakeNetworkSocketServer::SetMessageQueue(rtc::MessageQueue* msg_queue) {
+  msg_queue_ = msg_queue;
+  if (msg_queue_) {
+    msg_queue_->SignalQueueDestroyed.connect(
+        this, &FakeNetworkSocketServer::OnMessageQueueDestroyed);
+  }
+}
+
+// Always returns true (if return false, it won't be invoked again...)
+bool FakeNetworkSocketServer::Wait(int cms, bool process_io) {
+  RTC_DCHECK(msg_queue_ == rtc::Thread::Current());
+  if (!process_io) {
+    wakeup_.Wait(cms);
+    return true;
+  }
+  wakeup_.Wait(cms);
+
+  rtc::CritScope crit(&lock_);
+  for (auto* io_processor : io_processors_) {
+    while (io_processor->ProcessIo()) {
+    }
+  }
+  return true;
+}
+
+void FakeNetworkSocketServer::WakeUp() {
+  wakeup_.Set();
+}
+
+Timestamp FakeNetworkSocketServer::Now() const {
+  return Timestamp::us(clock_->TimeInMicroseconds());
+}
+
+}  // namespace test
+}  // namespace webrtc
diff --git a/test/scenario/network/fake_network_socket_server.h b/test/scenario/network/fake_network_socket_server.h
new file mode 100644
index 0000000..f0f9496
--- /dev/null
+++ b/test/scenario/network/fake_network_socket_server.h
@@ -0,0 +1,70 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_SERVER_H_
+#define TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_SERVER_H_
+
+#include <set>
+#include <vector>
+
+#include "api/units/timestamp.h"
+#include "rtc_base/async_socket.h"
+#include "rtc_base/critical_section.h"
+#include "rtc_base/event.h"
+#include "rtc_base/message_queue.h"
+#include "rtc_base/socket.h"
+#include "rtc_base/socket_address.h"
+#include "rtc_base/socket_server.h"
+#include "rtc_base/third_party/sigslot/sigslot.h"
+#include "system_wrappers/include/clock.h"
+#include "test/scenario/network/fake_network_socket.h"
+
+namespace webrtc {
+namespace test {
+
+// FakeNetworkSocketServer must outlive any sockets it creates.
+class FakeNetworkSocketServer : public rtc::SocketServer,
+                                public sigslot::has_slots<>,
+                                public SocketManager {
+ public:
+  FakeNetworkSocketServer(Clock* clock, std::vector<EndpointNode*> endpoints);
+  ~FakeNetworkSocketServer() override;
+
+  EndpointNode* GetEndpointNode(const rtc::IPAddress& ip) override;
+  void Unregister(SocketIoProcessor* io_processor) override;
+  void OnMessageQueueDestroyed();
+
+  // rtc::SocketFactory methods:
+  rtc::Socket* CreateSocket(int family, int type) override;
+  rtc::AsyncSocket* CreateAsyncSocket(int family, int type) override;
+
+  // rtc::SocketServer methods:
+  // Called by the network thread when this server is installed, kicking off the
+  // message handler loop.
+  void SetMessageQueue(rtc::MessageQueue* msg_queue) override;
+  bool Wait(int cms, bool process_io) override;
+  void WakeUp() override;
+
+ private:
+  Timestamp Now() const;
+
+  Clock* const clock_;
+  const std::vector<EndpointNode*> endpoints_;
+  rtc::Event wakeup_;
+  rtc::MessageQueue* msg_queue_;
+
+  rtc::CriticalSection lock_;
+  std::set<SocketIoProcessor*> io_processors_ RTC_GUARDED_BY(lock_);
+};
+
+}  // namespace test
+}  // namespace webrtc
+
+#endif  // TEST_SCENARIO_NETWORK_FAKE_NETWORK_SOCKET_SERVER_H_
diff --git a/test/scenario/network/network_emulation.cc b/test/scenario/network/network_emulation.cc
index 9ff24e7..9fd4925 100644
--- a/test/scenario/network/network_emulation.cc
+++ b/test/scenario/network/network_emulation.cc
@@ -10,9 +10,11 @@
 
 #include "test/scenario/network/network_emulation.h"
 
+#include <limits>
 #include <memory>
 
 #include "absl/memory/memory.h"
+#include "rtc_base/bind.h"
 #include "rtc_base/logging.h"
 
 namespace webrtc {
@@ -28,10 +30,9 @@
       dest_endpoint_id(dest_endpoint_id),
       data(data),
       arrival_time(arrival_time) {}
-
 EmulatedIpPacket::~EmulatedIpPacket() = default;
-
 EmulatedIpPacket::EmulatedIpPacket(EmulatedIpPacket&&) = default;
+EmulatedIpPacket& EmulatedIpPacket::operator=(EmulatedIpPacket&&) = default;
 
 void EmulatedNetworkNode::CreateRoute(
     uint64_t receiver_id,
@@ -57,8 +58,9 @@
 
 void EmulatedNetworkNode::OnPacketReceived(EmulatedIpPacket packet) {
   rtc::CritScope crit(&lock_);
-  if (routing_.find(packet.dest_endpoint_id) == routing_.end())
+  if (routing_.find(packet.dest_endpoint_id) == routing_.end()) {
     return;
+  }
   uint64_t packet_id = next_packet_id_++;
   bool sent = network_behavior_->EnqueuePacket(
       PacketInFlightInfo(packet.size(), packet.arrival_time.us(), packet_id));
@@ -119,7 +121,7 @@
                 .insert(std::pair<uint64_t, EmulatedNetworkReceiverInterface*>(
                     dest_endpoint_id, receiver))
                 .second)
-      << "Such routing already exists";
+      << "Routing for endpoint " << dest_endpoint_id << " already exists";
 }
 
 void EmulatedNetworkNode::RemoveReceiver(uint64_t dest_endpoint_id) {
@@ -127,5 +129,111 @@
   routing_.erase(dest_endpoint_id);
 }
 
+EndpointNode::EndpointNode(uint64_t id, rtc::IPAddress ip, Clock* clock)
+    : id_(id),
+      peer_local_addr_(ip),
+      send_node_(nullptr),
+      clock_(clock),
+      next_port_(kFirstEphemeralPort),
+      connected_endpoint_id_(absl::nullopt) {}
+EndpointNode::~EndpointNode() = default;
+
+uint64_t EndpointNode::GetId() const {
+  return id_;
+}
+
+void EndpointNode::SetSendNode(EmulatedNetworkNode* send_node) {
+  send_node_ = send_node;
+}
+
+void EndpointNode::SendPacket(const rtc::SocketAddress& from,
+                              const rtc::SocketAddress& to,
+                              rtc::CopyOnWriteBuffer packet) {
+  RTC_CHECK(from.ipaddr() == peer_local_addr_);
+  RTC_CHECK(connected_endpoint_id_);
+  RTC_CHECK(send_node_);
+  send_node_->OnPacketReceived(EmulatedIpPacket(
+      from, to, connected_endpoint_id_.value(), std::move(packet),
+      Timestamp::us(clock_->TimeInMicroseconds())));
+}
+
+absl::optional<uint16_t> EndpointNode::BindReceiver(
+    uint16_t desired_port,
+    EmulatedNetworkReceiverInterface* receiver) {
+  rtc::CritScope crit(&receiver_lock_);
+  uint16_t port = desired_port;
+  if (port == 0) {
+    // Because client can specify its own port, next_port_ can be already in
+    // use, so we need to find next available port.
+    int ports_pool_size =
+        std::numeric_limits<uint16_t>::max() - kFirstEphemeralPort + 1;
+    for (int i = 0; i < ports_pool_size; ++i) {
+      uint16_t next_port = NextPort();
+      if (port_to_receiver_.find(next_port) == port_to_receiver_.end()) {
+        port = next_port;
+        break;
+      }
+    }
+  }
+  RTC_CHECK(port != 0) << "Can't find free port for receiver in endpoint "
+                       << id_;
+  bool result = port_to_receiver_.insert({port, receiver}).second;
+  if (!result) {
+    RTC_LOG(INFO) << "Can't bind receiver to used port " << desired_port
+                  << " in endpoint " << id_;
+    return absl::nullopt;
+  }
+  RTC_LOG(INFO) << "New receiver is binded to endpoint " << id_ << " on port "
+                << port;
+  return port;
+}
+
+uint16_t EndpointNode::NextPort() {
+  uint16_t out = next_port_;
+  if (next_port_ == std::numeric_limits<uint16_t>::max()) {
+    next_port_ = kFirstEphemeralPort;
+  } else {
+    next_port_++;
+  }
+  return out;
+}
+
+void EndpointNode::UnbindReceiver(uint16_t port) {
+  rtc::CritScope crit(&receiver_lock_);
+  port_to_receiver_.erase(port);
+}
+
+rtc::IPAddress EndpointNode::GetPeerLocalAddress() const {
+  return peer_local_addr_;
+}
+
+void EndpointNode::OnPacketReceived(EmulatedIpPacket packet) {
+  RTC_CHECK(packet.dest_endpoint_id == id_)
+      << "Routing error: wrong destination endpoint. Destination id: "
+      << packet.dest_endpoint_id << "; Receiver id: " << id_;
+  rtc::CritScope crit(&receiver_lock_);
+  auto it = port_to_receiver_.find(packet.to.port());
+  if (it == port_to_receiver_.end()) {
+    // It can happen, that remote peer closed connection, but there still some
+    // packets, that are going to it. It can happen during peer connection close
+    // process: one peer closed connection, second still sending data.
+    RTC_LOG(INFO) << "No receiver registered in " << id_ << " on port "
+                  << packet.to.port();
+    return;
+  }
+  // Endpoint assumes frequent calls to bind and unbind methods, so it holds
+  // lock during packet processing to ensure that receiver won't be deleted
+  // before call to OnPacketReceived.
+  it->second->OnPacketReceived(std::move(packet));
+}
+
+EmulatedNetworkNode* EndpointNode::GetSendNode() const {
+  return send_node_;
+}
+
+void EndpointNode::SetConnectedEndpointId(uint64_t endpoint_id) {
+  connected_endpoint_id_ = endpoint_id;
+}
+
 }  // namespace test
 }  // namespace webrtc
diff --git a/test/scenario/network/network_emulation.h b/test/scenario/network/network_emulation.h
index ba87984..d133337 100644
--- a/test/scenario/network/network_emulation.h
+++ b/test/scenario/network/network_emulation.h
@@ -23,8 +23,10 @@
 #include "api/units/timestamp.h"
 #include "rtc_base/async_socket.h"
 #include "rtc_base/copy_on_write_buffer.h"
+#include "rtc_base/critical_section.h"
 #include "rtc_base/socket_address.h"
 #include "rtc_base/thread.h"
+#include "system_wrappers/include/clock.h"
 
 namespace webrtc {
 namespace test {
@@ -36,7 +38,6 @@
                    uint64_t dest_endpoint_id,
                    rtc::CopyOnWriteBuffer data,
                    Timestamp arrival_time);
-
   ~EmulatedIpPacket();
   // This object is not copyable or assignable.
   EmulatedIpPacket(const EmulatedIpPacket&) = delete;
@@ -107,6 +108,72 @@
   uint64_t next_packet_id_ RTC_GUARDED_BY(lock_) = 1;
 };
 
+// Represents single network interface on the device.
+// It will be used as sender from socket side to send data to the network and
+// will act as packet receiver from emulated network side to receive packets
+// from other EmulatedNetworkNodes.
+class EndpointNode : public EmulatedNetworkReceiverInterface {
+ public:
+  EndpointNode(uint64_t id, rtc::IPAddress, Clock* clock);
+  ~EndpointNode() override;
+
+  uint64_t GetId() const;
+
+  // Set network node, that will be used to send packets to the network.
+  void SetSendNode(EmulatedNetworkNode* send_node);
+  // Send packet into network.
+  // |from| will be used to set source address for the packet in destination
+  // socket.
+  // |to| will be used for routing verification and picking right socket by port
+  // on destination endpoint.
+  void SendPacket(const rtc::SocketAddress& from,
+                  const rtc::SocketAddress& to,
+                  rtc::CopyOnWriteBuffer packet);
+
+  // Binds receiver to this endpoint to send and receive data.
+  // |desired_port| is a port that should be used. If it is equal to 0,
+  // endpoint will pick the first available port starting from
+  // |kFirstEphemeralPort|.
+  //
+  // Returns the port, that should be used (it will be equals to desired, if
+  // |desired_port| != 0 and is free or will be the one, selected by endpoint)
+  // or absl::nullopt if desired_port in used. Also fails if there are no more
+  // free ports to bind to.
+  absl::optional<uint16_t> BindReceiver(
+      uint16_t desired_port,
+      EmulatedNetworkReceiverInterface* receiver);
+  void UnbindReceiver(uint16_t port);
+
+  rtc::IPAddress GetPeerLocalAddress() const;
+
+  // Will be called to deliver packet into endpoint from network node.
+  void OnPacketReceived(EmulatedIpPacket packet) override;
+
+ protected:
+  friend class NetworkEmulationManager;
+
+  EmulatedNetworkNode* GetSendNode() const;
+  void SetConnectedEndpointId(uint64_t endpoint_id);
+
+ private:
+  static constexpr uint16_t kFirstEphemeralPort = 49152;
+  uint16_t NextPort() RTC_EXCLUSIVE_LOCKS_REQUIRED(receiver_lock_);
+
+  rtc::CriticalSection receiver_lock_;
+
+  uint64_t id_;
+  // Peer's local IP address for this endpoint network interface.
+  const rtc::IPAddress peer_local_addr_;
+  EmulatedNetworkNode* send_node_;
+  Clock* const clock_;
+
+  uint16_t next_port_ RTC_GUARDED_BY(receiver_lock_);
+  std::map<uint16_t, EmulatedNetworkReceiverInterface*> port_to_receiver_
+      RTC_GUARDED_BY(receiver_lock_);
+
+  absl::optional<uint64_t> connected_endpoint_id_;
+};
+
 }  // namespace test
 }  // namespace webrtc
 
diff --git a/test/scenario/network/network_emulation_manager.cc b/test/scenario/network/network_emulation_manager.cc
new file mode 100644
index 0000000..629f448
--- /dev/null
+++ b/test/scenario/network/network_emulation_manager.cc
@@ -0,0 +1,136 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include "test/scenario/network/network_emulation_manager.h"
+
+#include <algorithm>
+#include <memory>
+
+#include "absl/memory/memory.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
+
+namespace webrtc {
+namespace test {
+namespace {
+
+constexpr int64_t kPacketProcessingIntervalMs = 1;
+
+}  // namespace
+
+NetworkEmulationManager::NetworkEmulationManager(webrtc::Clock* clock)
+    : clock_(clock),
+      next_node_id_(1),
+      task_queue_("network_emulation_manager") {}
+NetworkEmulationManager::~NetworkEmulationManager() {
+  Stop();
+}
+
+EmulatedNetworkNode* NetworkEmulationManager::CreateEmulatedNode(
+    std::unique_ptr<NetworkBehaviorInterface> network_behavior) {
+  auto node =
+      absl::make_unique<EmulatedNetworkNode>(std::move(network_behavior));
+  EmulatedNetworkNode* out = node.get();
+
+  struct Closure {
+    void operator()() { manager->network_nodes_.push_back(std::move(node)); }
+    NetworkEmulationManager* manager;
+    std::unique_ptr<EmulatedNetworkNode> node;
+  };
+  task_queue_.PostTask(Closure{this, std::move(node)});
+  return out;
+}
+
+EndpointNode* NetworkEmulationManager::CreateEndpoint(rtc::IPAddress ip) {
+  auto node = absl::make_unique<EndpointNode>(next_node_id_++, ip, clock_);
+  EndpointNode* out = node.get();
+  endpoints_.push_back(std::move(node));
+  return out;
+}
+
+void NetworkEmulationManager::CreateRoute(
+    EndpointNode* from,
+    std::vector<EmulatedNetworkNode*> via_nodes,
+    EndpointNode* to) {
+  // Because endpoint has no send node by default at least one should be
+  // provided here.
+  RTC_CHECK(!via_nodes.empty());
+
+  from->SetSendNode(via_nodes[0]);
+  EmulatedNetworkNode* cur_node = via_nodes[0];
+  for (size_t i = 1; i < via_nodes.size(); ++i) {
+    cur_node->SetReceiver(to->GetId(), via_nodes[i]);
+    cur_node = via_nodes[i];
+  }
+  cur_node->SetReceiver(to->GetId(), to);
+  from->SetConnectedEndpointId(to->GetId());
+}
+
+void NetworkEmulationManager::ClearRoute(
+    EndpointNode* from,
+    std::vector<EmulatedNetworkNode*> via_nodes,
+    EndpointNode* to) {
+  // Remove receiver from intermediate nodes.
+  for (auto* node : via_nodes) {
+    node->RemoveReceiver(to->GetId());
+  }
+  // Detach endpoint from current send node.
+  if (from->GetSendNode()) {
+    from->GetSendNode()->RemoveReceiver(to->GetId());
+    from->SetSendNode(nullptr);
+  }
+}
+
+rtc::Thread* NetworkEmulationManager::CreateNetworkThread(
+    std::vector<EndpointNode*> endpoints) {
+  FakeNetworkSocketServer* socket_server = CreateSocketServer(endpoints);
+  std::unique_ptr<rtc::Thread> network_thread =
+      absl::make_unique<rtc::Thread>(socket_server);
+  network_thread->SetName("network_thread" + std::to_string(threads_.size()),
+                          nullptr);
+  network_thread->Start();
+  rtc::Thread* out = network_thread.get();
+  threads_.push_back(std::move(network_thread));
+  return out;
+}
+
+void NetworkEmulationManager::Start() {
+  process_task_handle_ = RepeatingTaskHandle::Start(&task_queue_, [this] {
+    ProcessNetworkPackets();
+    return TimeDelta::ms(kPacketProcessingIntervalMs);
+  });
+}
+
+void NetworkEmulationManager::Stop() {
+  process_task_handle_.PostStop();
+}
+
+FakeNetworkSocketServer* NetworkEmulationManager::CreateSocketServer(
+    std::vector<EndpointNode*> endpoints) {
+  auto socket_server =
+      absl::make_unique<FakeNetworkSocketServer>(clock_, endpoints);
+  FakeNetworkSocketServer* out = socket_server.get();
+  socket_servers_.push_back(std::move(socket_server));
+  return out;
+}
+
+void NetworkEmulationManager::ProcessNetworkPackets() {
+  Timestamp current_time = Now();
+  for (auto& node : network_nodes_) {
+    node->Process(current_time);
+  }
+}
+
+Timestamp NetworkEmulationManager::Now() const {
+  return Timestamp::us(clock_->TimeInMicroseconds());
+}
+
+}  // namespace test
+}  // namespace webrtc
diff --git a/test/scenario/network/network_emulation_manager.h b/test/scenario/network/network_emulation_manager.h
new file mode 100644
index 0000000..f06fb75
--- /dev/null
+++ b/test/scenario/network/network_emulation_manager.h
@@ -0,0 +1,80 @@
+/*
+ *  Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#ifndef TEST_SCENARIO_NETWORK_NETWORK_EMULATION_MANAGER_H_
+#define TEST_SCENARIO_NETWORK_NETWORK_EMULATION_MANAGER_H_
+
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "api/test/simulated_network.h"
+#include "api/units/time_delta.h"
+#include "api/units/timestamp.h"
+#include "rtc_base/logging.h"
+#include "rtc_base/task_queue.h"
+#include "rtc_base/task_utils/repeating_task.h"
+#include "rtc_base/thread.h"
+#include "test/scenario/network/fake_network_socket_server.h"
+#include "test/scenario/network/network_emulation.h"
+
+namespace webrtc {
+namespace test {
+
+class NetworkEmulationManager {
+ public:
+  explicit NetworkEmulationManager(Clock* clock);
+  ~NetworkEmulationManager();
+
+  EmulatedNetworkNode* CreateEmulatedNode(
+      std::unique_ptr<NetworkBehaviorInterface> network_behavior);
+
+  // TODO(titovartem) add method without IP address, where manager
+  // will provided some unique generated address.
+  EndpointNode* CreateEndpoint(rtc::IPAddress ip);
+
+  void CreateRoute(EndpointNode* from,
+                   std::vector<EmulatedNetworkNode*> via_nodes,
+                   EndpointNode* to);
+  void ClearRoute(EndpointNode* from,
+                  std::vector<EmulatedNetworkNode*> via_nodes,
+                  EndpointNode* to);
+
+  rtc::Thread* CreateNetworkThread(std::vector<EndpointNode*> endpoints);
+
+  void Start();
+  void Stop();
+
+ private:
+  FakeNetworkSocketServer* CreateSocketServer(
+      std::vector<EndpointNode*> endpoints);
+  void ProcessNetworkPackets();
+  Timestamp Now() const;
+
+  Clock* const clock_;
+  int next_node_id_;
+
+  RepeatingTaskHandle process_task_handle_;
+
+  // All objects can be added to the manager only when it is idle.
+  std::vector<std::unique_ptr<EndpointNode>> endpoints_;
+  std::vector<std::unique_ptr<EmulatedNetworkNode>> network_nodes_;
+  std::vector<std::unique_ptr<FakeNetworkSocketServer>> socket_servers_;
+  std::vector<std::unique_ptr<rtc::Thread>> threads_;
+
+  // Must be the last field, so it will be deleted first, because tasks
+  // in the TaskQueue can access other fields of the instance of this class.
+  rtc::TaskQueue task_queue_;
+};
+
+}  // namespace test
+}  // namespace webrtc
+
+#endif  // TEST_SCENARIO_NETWORK_NETWORK_EMULATION_MANAGER_H_
diff --git a/test/scenario/network/network_emulation_pc_unittest.cc b/test/scenario/network/network_emulation_pc_unittest.cc
new file mode 100644
index 0000000..105cb56
--- /dev/null
+++ b/test/scenario/network/network_emulation_pc_unittest.cc
@@ -0,0 +1,203 @@
+/*
+ *  Copyright 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include <cstdint>
+#include <memory>
+
+#include "absl/memory/memory.h"
+#include "api/audio_codecs/builtin_audio_decoder_factory.h"
+#include "api/audio_codecs/builtin_audio_encoder_factory.h"
+#include "api/call/call_factory_interface.h"
+#include "api/peer_connection_interface.h"
+#include "api/scoped_refptr.h"
+#include "api/video_codecs/builtin_video_decoder_factory.h"
+#include "api/video_codecs/builtin_video_encoder_factory.h"
+#include "call/simulated_network.h"
+#include "logging/rtc_event_log/rtc_event_log_factory.h"
+#include "media/engine/webrtc_media_engine.h"
+#include "modules/audio_device/include/test_audio_device.h"
+#include "p2p/client/basic_port_allocator.h"
+#include "pc/peer_connection_wrapper.h"
+#include "pc/test/mock_peer_connection_observers.h"
+#include "rtc_base/async_invoker.h"
+#include "rtc_base/fake_network.h"
+#include "rtc_base/gunit.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+#include "test/scenario/network/network_emulation.h"
+#include "test/scenario/network/network_emulation_manager.h"
+
+namespace webrtc {
+namespace test {
+namespace {
+
+constexpr int kDefaultTimeoutMs = 1000;
+constexpr int kMaxAptitude = 32000;
+constexpr int kSamplingFrequency = 48000;
+constexpr char kSignalThreadName[] = "signaling_thread";
+
+bool AddIceCandidates(PeerConnectionWrapper* peer,
+                      std::vector<const IceCandidateInterface*> candidates) {
+  bool success = true;
+  for (const auto candidate : candidates) {
+    if (!peer->pc()->AddIceCandidate(candidate)) {
+      success = false;
+    }
+  }
+  return success;
+}
+
+rtc::scoped_refptr<PeerConnectionFactoryInterface> CreatePeerConnectionFactory(
+    rtc::Thread* signaling_thread,
+    rtc::Thread* network_thread) {
+  PeerConnectionFactoryDependencies pcf_deps;
+  pcf_deps.call_factory = webrtc::CreateCallFactory();
+  pcf_deps.event_log_factory = webrtc::CreateRtcEventLogFactory();
+  pcf_deps.network_thread = network_thread;
+  pcf_deps.signaling_thread = signaling_thread;
+  pcf_deps.media_engine = cricket::WebRtcMediaEngineFactory::Create(
+      TestAudioDeviceModule::CreateTestAudioDeviceModule(
+          TestAudioDeviceModule::CreatePulsedNoiseCapturer(kMaxAptitude,
+                                                           kSamplingFrequency),
+          TestAudioDeviceModule::CreateDiscardRenderer(kSamplingFrequency)),
+      webrtc::CreateBuiltinAudioEncoderFactory(),
+      webrtc::CreateBuiltinAudioDecoderFactory(),
+      webrtc::CreateBuiltinVideoEncoderFactory(),
+      webrtc::CreateBuiltinVideoDecoderFactory(), /*audio_mixer=*/nullptr,
+      webrtc::AudioProcessingBuilder().Create());
+  return CreateModularPeerConnectionFactory(std::move(pcf_deps));
+}
+
+rtc::scoped_refptr<PeerConnectionInterface> CreatePeerConnection(
+    const rtc::scoped_refptr<PeerConnectionFactoryInterface>& pcf,
+    PeerConnectionObserver* observer,
+    rtc::NetworkManager* network_manager) {
+  PeerConnectionDependencies pc_deps(observer);
+  auto port_allocator =
+      absl::make_unique<cricket::BasicPortAllocator>(network_manager);
+
+  // This test does not support TCP
+  int flags = cricket::PORTALLOCATOR_DISABLE_TCP;
+  port_allocator->set_flags(port_allocator->flags() | flags);
+
+  pc_deps.allocator = std::move(port_allocator);
+  PeerConnectionInterface::RTCConfiguration rtc_configuration;
+  rtc_configuration.sdp_semantics = SdpSemantics::kUnifiedPlan;
+
+  return pcf->CreatePeerConnection(rtc_configuration, std::move(pc_deps));
+}
+
+}  // namespace
+
+TEST(NetworkEmulationManagerPCTest, Run) {
+  std::unique_ptr<rtc::Thread> signaling_thread = rtc::Thread::Create();
+  signaling_thread->SetName(kSignalThreadName, nullptr);
+  signaling_thread->Start();
+
+  // Setup emulated network
+  NetworkEmulationManager network_manager(Clock::GetRealTimeClock());
+
+  EmulatedNetworkNode* alice_node = network_manager.CreateEmulatedNode(
+      absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
+  EmulatedNetworkNode* bob_node = network_manager.CreateEmulatedNode(
+      absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
+  rtc::IPAddress alice_ip(1);
+  EndpointNode* alice_endpoint = network_manager.CreateEndpoint(alice_ip);
+  rtc::IPAddress bob_ip(2);
+  EndpointNode* bob_endpoint = network_manager.CreateEndpoint(bob_ip);
+  network_manager.CreateRoute(alice_endpoint, {alice_node}, bob_endpoint);
+  network_manager.CreateRoute(bob_endpoint, {bob_node}, alice_endpoint);
+
+  rtc::Thread* alice_network_thread =
+      network_manager.CreateNetworkThread({alice_endpoint});
+  rtc::Thread* bob_network_thread =
+      network_manager.CreateNetworkThread({bob_endpoint});
+
+  // Setup peer connections.
+  rtc::scoped_refptr<PeerConnectionFactoryInterface> alice_pcf;
+  rtc::scoped_refptr<PeerConnectionInterface> alice_pc;
+  std::unique_ptr<MockPeerConnectionObserver> alice_observer =
+      absl::make_unique<MockPeerConnectionObserver>();
+  std::unique_ptr<rtc::FakeNetworkManager> alice_network_manager =
+      absl::make_unique<rtc::FakeNetworkManager>();
+  alice_network_manager->AddInterface(rtc::SocketAddress(alice_ip, 0));
+
+  rtc::scoped_refptr<PeerConnectionFactoryInterface> bob_pcf;
+  rtc::scoped_refptr<PeerConnectionInterface> bob_pc;
+  std::unique_ptr<MockPeerConnectionObserver> bob_observer =
+      absl::make_unique<MockPeerConnectionObserver>();
+  std::unique_ptr<rtc::FakeNetworkManager> bob_network_manager =
+      absl::make_unique<rtc::FakeNetworkManager>();
+  bob_network_manager->AddInterface(rtc::SocketAddress(bob_ip, 0));
+
+  signaling_thread->Invoke<void>(RTC_FROM_HERE, [&]() {
+    alice_pcf = CreatePeerConnectionFactory(signaling_thread.get(),
+                                            alice_network_thread);
+    alice_pc = CreatePeerConnection(alice_pcf, alice_observer.get(),
+                                    alice_network_manager.get());
+
+    bob_pcf =
+        CreatePeerConnectionFactory(signaling_thread.get(), bob_network_thread);
+    bob_pc = CreatePeerConnection(bob_pcf, bob_observer.get(),
+                                  bob_network_manager.get());
+  });
+
+  std::unique_ptr<PeerConnectionWrapper> alice =
+      absl::make_unique<PeerConnectionWrapper>(alice_pcf, alice_pc,
+                                               std::move(alice_observer));
+  std::unique_ptr<PeerConnectionWrapper> bob =
+      absl::make_unique<PeerConnectionWrapper>(bob_pcf, bob_pc,
+                                               std::move(bob_observer));
+
+  network_manager.Start();
+
+  signaling_thread->Invoke<void>(RTC_FROM_HERE, [&]() {
+    rtc::scoped_refptr<DataChannelInterface> channel =
+        alice->CreateDataChannel("data");
+
+    // Connect peers.
+    ASSERT_TRUE(alice->ExchangeOfferAnswerWith(bob.get()));
+    // Do the SDP negotiation, and also exchange ice candidates.
+    ASSERT_TRUE_WAIT(
+        alice->signaling_state() == PeerConnectionInterface::kStable,
+        kDefaultTimeoutMs);
+    ASSERT_TRUE_WAIT(alice->IsIceGatheringDone(), kDefaultTimeoutMs);
+    ASSERT_TRUE_WAIT(bob->IsIceGatheringDone(), kDefaultTimeoutMs);
+
+    // Connect an ICE candidate pairs.
+    ASSERT_TRUE(
+        AddIceCandidates(bob.get(), alice->observer()->GetAllCandidates()));
+    ASSERT_TRUE(
+        AddIceCandidates(alice.get(), bob->observer()->GetAllCandidates()));
+    // This means that ICE and DTLS are connected.
+    ASSERT_TRUE_WAIT(bob->IsIceConnected(), kDefaultTimeoutMs);
+    ASSERT_TRUE_WAIT(alice->IsIceConnected(), kDefaultTimeoutMs);
+
+    ASSERT_TRUE_WAIT(bob->observer()->last_datachannel_ != nullptr,
+                     kDefaultTimeoutMs);
+    MockDataChannelObserver observer(bob->observer()->last_datachannel_);
+    channel->Send(DataBuffer("Test data"));
+    ASSERT_TRUE_WAIT(observer.received_message_count() == 1, kDefaultTimeoutMs);
+    ASSERT_EQ("Test data", observer.last_message());
+
+    // Close peer connections
+    alice->pc()->Close();
+    bob->pc()->Close();
+
+    // Delete peers.
+    alice.reset();
+    bob.reset();
+  });
+
+  network_manager.Stop();
+}
+
+}  // namespace test
+}  // namespace webrtc
diff --git a/test/scenario/network/network_emulation_unittest.cc b/test/scenario/network/network_emulation_unittest.cc
new file mode 100644
index 0000000..d04a99f
--- /dev/null
+++ b/test/scenario/network/network_emulation_unittest.cc
@@ -0,0 +1,114 @@
+/*
+ *  Copyright 2019 The WebRTC project authors. All Rights Reserved.
+ *
+ *  Use of this source code is governed by a BSD-style license
+ *  that can be found in the LICENSE file in the root of the source
+ *  tree. An additional intellectual property rights grant can be found
+ *  in the file PATENTS.  All contributing project authors may
+ *  be found in the AUTHORS file in the root of the source tree.
+ */
+
+#include <memory>
+
+#include "absl/memory/memory.h"
+#include "api/test/simulated_network.h"
+#include "call/simulated_network.h"
+#include "rtc_base/event.h"
+#include "rtc_base/logging.h"
+#include "test/gmock.h"
+#include "test/gtest.h"
+#include "test/scenario/network/network_emulation.h"
+#include "test/scenario/network/network_emulation_manager.h"
+
+namespace webrtc {
+namespace test {
+
+class SocketReader : public sigslot::has_slots<> {
+ public:
+  explicit SocketReader(rtc::AsyncSocket* socket) : socket_(socket) {
+    socket_->SignalReadEvent.connect(this, &SocketReader::OnReadEvent);
+    size_ = 128 * 1024;
+    buf_ = new char[size_];
+  }
+  ~SocketReader() override { delete[] buf_; }
+
+  void OnReadEvent(rtc::AsyncSocket* socket) {
+    RTC_DCHECK(socket_ == socket);
+    int64_t timestamp;
+    len_ = socket_->Recv(buf_, size_, &timestamp);
+    {
+      rtc::CritScope crit(&lock_);
+      received_count_++;
+    }
+  }
+
+  int ReceivedCount() {
+    rtc::CritScope crit(&lock_);
+    return received_count_;
+  }
+
+ private:
+  rtc::AsyncSocket* socket_;
+  char* buf_;
+  size_t size_;
+  int len_;
+
+  rtc::CriticalSection lock_;
+  int received_count_ RTC_GUARDED_BY(lock_) = 0;
+};
+
+TEST(NetworkEmulationManagerTest, Run) {
+  NetworkEmulationManager network_manager(Clock::GetRealTimeClock());
+
+  EmulatedNetworkNode* alice_node = network_manager.CreateEmulatedNode(
+      absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
+  EmulatedNetworkNode* bob_node = network_manager.CreateEmulatedNode(
+      absl::make_unique<SimulatedNetwork>(BuiltInNetworkBehaviorConfig()));
+  EndpointNode* alice_endpoint =
+      network_manager.CreateEndpoint(rtc::IPAddress(1));
+  EndpointNode* bob_endpoint =
+      network_manager.CreateEndpoint(rtc::IPAddress(2));
+  network_manager.CreateRoute(alice_endpoint, {alice_node}, bob_endpoint);
+  network_manager.CreateRoute(bob_endpoint, {bob_node}, alice_endpoint);
+
+  auto* nt1 = network_manager.CreateNetworkThread({alice_endpoint});
+  auto* nt2 = network_manager.CreateNetworkThread({bob_endpoint});
+
+  network_manager.Start();
+
+  for (uint64_t j = 0; j < 2; j++) {
+    auto* s1 = nt1->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM);
+    auto* s2 = nt2->socketserver()->CreateAsyncSocket(AF_INET, SOCK_DGRAM);
+
+    SocketReader r1(s1);
+    SocketReader r2(s2);
+
+    rtc::SocketAddress a1(alice_endpoint->GetPeerLocalAddress(), 0);
+    rtc::SocketAddress a2(bob_endpoint->GetPeerLocalAddress(), 0);
+
+    s1->Bind(a1);
+    s2->Bind(a2);
+
+    s1->Connect(s1->GetLocalAddress());
+    s2->Connect(s2->GetLocalAddress());
+
+    rtc::CopyOnWriteBuffer data("Hello");
+    for (uint64_t i = 0; i < 1000; i++) {
+      s1->Send(data.data(), data.size());
+      s2->Send(data.data(), data.size());
+    }
+
+    rtc::Event wait;
+    wait.Wait(1000);
+    ASSERT_EQ(r1.ReceivedCount(), 1000);
+    ASSERT_EQ(r2.ReceivedCount(), 1000);
+
+    delete s1;
+    delete s2;
+  }
+
+  network_manager.Stop();
+}
+
+}  // namespace test
+}  // namespace webrtc