Make DegradedCall work without DCHECK failure

Ability to emulate degraded networks using DegradedCall has not
been covered by tests and it is crashing due to DCHECKs.
Fix threading issues so this no longer crash.

Bug: None
Change-Id: I9276dfb1f71762faa02146af0bfaab713bebb7f7
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/276060
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Daniel.L (Byoungchan) Lee <daniel.l@hpcnt.com>
Cr-Commit-Position: refs/heads/main@{#38216}
diff --git a/call/BUILD.gn b/call/BUILD.gn
index efcb591..c56c557 100644
--- a/call/BUILD.gn
+++ b/call/BUILD.gn
@@ -320,6 +320,7 @@
     "../rtc_base:logging",
     "../rtc_base:macromagic",
     "../rtc_base:rate_limiter",
+    "../rtc_base:rtc_event",
     "../rtc_base:rtc_task_queue",
     "../rtc_base:safe_minmax",
     "../rtc_base:stringutils",
diff --git a/call/degraded_call.cc b/call/degraded_call.cc
index 8c3da57..0090d3a 100644
--- a/call/degraded_call.cc
+++ b/call/degraded_call.cc
@@ -14,17 +14,19 @@
 #include <utility>
 
 #include "absl/strings/string_view.h"
+#include "modules/rtp_rtcp/source/rtp_util.h"
+#include "rtc_base/event.h"
 
 namespace webrtc {
 
 DegradedCall::FakeNetworkPipeOnTaskQueue::FakeNetworkPipeOnTaskQueue(
     TaskQueueBase* task_queue,
-    const ScopedTaskSafety& task_safety,
+    rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive,
     Clock* clock,
     std::unique_ptr<NetworkBehaviorInterface> network_behavior)
     : clock_(clock),
       task_queue_(task_queue),
-      task_safety_(task_safety),
+      call_alive_(std::move(call_alive)),
       pipe_(clock, std::move(network_behavior)) {}
 
 void DegradedCall::FakeNetworkPipeOnTaskQueue::SendRtp(
@@ -61,13 +63,13 @@
     return false;
   }
 
-  task_queue_->PostTask(SafeTask(task_safety_.flag(), [this, time_to_next] {
+  task_queue_->PostTask(SafeTask(call_alive_, [this, time_to_next] {
     RTC_DCHECK_RUN_ON(task_queue_);
     int64_t next_process_time = *time_to_next + clock_->TimeInMilliseconds();
     if (!next_process_ms_ || next_process_time < *next_process_ms_) {
       next_process_ms_ = next_process_time;
       task_queue_->PostDelayedHighPrecisionTask(
-          SafeTask(task_safety_.flag(),
+          SafeTask(call_alive_,
                    [this] {
                      RTC_DCHECK_RUN_ON(task_queue_);
                      if (!Process()) {
@@ -126,12 +128,61 @@
   return true;
 }
 
+DegradedCall::ThreadedPacketReceiver::ThreadedPacketReceiver(
+    webrtc::TaskQueueBase* worker_thread,
+    webrtc::TaskQueueBase* network_thread,
+    rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive,
+    webrtc::PacketReceiver* receiver)
+    : worker_thread_(worker_thread),
+      network_thread_(network_thread),
+      call_alive_(std::move(call_alive)),
+      receiver_(receiver) {}
+
+DegradedCall::ThreadedPacketReceiver::~ThreadedPacketReceiver() = default;
+
+PacketReceiver::DeliveryStatus
+DegradedCall::ThreadedPacketReceiver::DeliverPacket(
+    MediaType media_type,
+    rtc::CopyOnWriteBuffer packet,
+    int64_t packet_time_us) {
+  // `Call::DeliverPacket` expects RTCP packets to be delivered from the
+  // network thread and RTP packets to be delivered from the worker thread.
+  // Because `FakeNetworkPipe` queues packets, the thread used when this packet
+  // is delivered to `DegradedCall::DeliverPacket` may differ from the thread
+  // used when this packet is delivered to
+  // `ThreadedPacketReceiver::DeliverPacket`. To solve this problem, always
+  // make sure that packets are sent in the correct thread.
+  if (IsRtcpPacket(packet)) {
+    if (!network_thread_->IsCurrent()) {
+      network_thread_->PostTask(
+          SafeTask(call_alive_, [receiver = receiver_, media_type,
+                                 packet = std::move(packet), packet_time_us]() {
+            receiver->DeliverPacket(media_type, std::move(packet),
+                                    packet_time_us);
+          }));
+      return DELIVERY_OK;
+    }
+  } else {
+    if (!worker_thread_->IsCurrent()) {
+      worker_thread_->PostTask([receiver = receiver_, media_type,
+                                packet = std::move(packet), packet_time_us]() {
+        receiver->DeliverPacket(media_type, std::move(packet), packet_time_us);
+      });
+      return DELIVERY_OK;
+    }
+  }
+
+  return receiver_->DeliverPacket(media_type, std::move(packet),
+                                  packet_time_us);
+}
+
 DegradedCall::DegradedCall(
     std::unique_ptr<Call> call,
     const std::vector<TimeScopedNetworkConfig>& send_configs,
     const std::vector<TimeScopedNetworkConfig>& receive_configs)
     : clock_(Clock::GetRealTimeClock()),
       call_(std::move(call)),
+      call_alive_(PendingTaskSafetyFlag::CreateDetached()),
       send_config_index_(0),
       send_configs_(send_configs),
       send_simulated_network_(nullptr),
@@ -142,11 +193,13 @@
     receive_simulated_network_ = network.get();
     receive_pipe_ =
         std::make_unique<webrtc::FakeNetworkPipe>(clock_, std::move(network));
-    receive_pipe_->SetReceiver(call_->Receiver());
+    packet_receiver_ = std::make_unique<ThreadedPacketReceiver>(
+        call_->worker_thread(), call_->network_thread(), call_alive_,
+        call_->Receiver());
+    receive_pipe_->SetReceiver(packet_receiver_.get());
     if (receive_configs_.size() > 1) {
       call_->network_thread()->PostDelayedTask(
-          SafeTask(task_safety_.flag(),
-                   [this] { UpdateReceiveNetworkConfig(); }),
+          SafeTask(call_alive_, [this] { UpdateReceiveNetworkConfig(); }),
           receive_configs_[0].duration);
     }
   }
@@ -154,16 +207,29 @@
     auto network = std::make_unique<SimulatedNetwork>(send_configs_[0]);
     send_simulated_network_ = network.get();
     send_pipe_ = std::make_unique<FakeNetworkPipeOnTaskQueue>(
-        call_->network_thread(), task_safety_, clock_, std::move(network));
+        call_->network_thread(), call_alive_, clock_, std::move(network));
     if (send_configs_.size() > 1) {
       call_->network_thread()->PostDelayedTask(
-          SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }),
+          SafeTask(call_alive_, [this] { UpdateSendNetworkConfig(); }),
           send_configs_[0].duration);
     }
   }
 }
 
-DegradedCall::~DegradedCall() = default;
+DegradedCall::~DegradedCall() {
+  RTC_DCHECK_RUN_ON(call_->worker_thread());
+  // Thread synchronization is required to call `SetNotAlive`.
+  // Otherwise, when the `DegradedCall` object is destroyed but
+  // `SetNotAlive` has not yet been called,
+  // another Closure guarded by `call_alive_` may be called.
+  rtc::Event event;
+  call_->network_thread()->PostTask(
+      [flag = std::move(call_alive_), &event]() mutable {
+        flag->SetNotAlive();
+        event.Set();
+      });
+  event.Wait(rtc::Event::kForever);
+}
 
 AudioSendStream* DegradedCall::CreateAudioSendStream(
     const AudioSendStream::Config& config) {
@@ -352,7 +418,7 @@
   send_config_index_ = (send_config_index_ + 1) % send_configs_.size();
   send_simulated_network_->SetConfig(send_configs_[send_config_index_]);
   call_->network_thread()->PostDelayedTask(
-      SafeTask(task_safety_.flag(), [this] { UpdateSendNetworkConfig(); }),
+      SafeTask(call_alive_, [this] { UpdateSendNetworkConfig(); }),
       send_configs_[send_config_index_].duration);
 }
 
@@ -361,7 +427,7 @@
   receive_simulated_network_->SetConfig(
       receive_configs_[receive_config_index_]);
   call_->network_thread()->PostDelayedTask(
-      SafeTask(task_safety_.flag(), [this] { UpdateReceiveNetworkConfig(); }),
+      SafeTask(call_alive_, [this] { UpdateReceiveNetworkConfig(); }),
       receive_configs_[receive_config_index_].duration);
 }
 }  // namespace webrtc
diff --git a/call/degraded_call.h b/call/degraded_call.h
index fe5fd7c..5223022 100644
--- a/call/degraded_call.h
+++ b/call/degraded_call.h
@@ -122,7 +122,7 @@
    public:
     FakeNetworkPipeOnTaskQueue(
         TaskQueueBase* task_queue,
-        const ScopedTaskSafety& task_safety,
+        rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive,
         Clock* clock,
         std::unique_ptr<NetworkBehaviorInterface> network_behavior);
 
@@ -142,11 +142,30 @@
 
     Clock* const clock_;
     TaskQueueBase* const task_queue_;
-    const ScopedTaskSafety& task_safety_;
+    rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive_;
     FakeNetworkPipe pipe_;
     absl::optional<int64_t> next_process_ms_ RTC_GUARDED_BY(&task_queue_);
   };
 
+  class ThreadedPacketReceiver : public PacketReceiver {
+   public:
+    ThreadedPacketReceiver(webrtc::TaskQueueBase* worker_thread,
+                           webrtc::TaskQueueBase* network_thread,
+                           rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive,
+                           PacketReceiver* receiver);
+    ~ThreadedPacketReceiver() override;
+
+    DeliveryStatus DeliverPacket(MediaType media_type,
+                                 rtc::CopyOnWriteBuffer packet,
+                                 int64_t packet_time_us) override;
+
+   private:
+    webrtc::TaskQueueBase* const worker_thread_;
+    webrtc::TaskQueueBase* const network_thread_;
+    rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive_;
+    webrtc::PacketReceiver* const receiver_;
+  };
+
   // For audio/video send stream, a TransportAdapter instance is used to
   // intercept packets to be sent, and put them into a common FakeNetworkPipe
   // in such as way that they will eventually (unless dropped) be forwarded to
@@ -178,7 +197,8 @@
 
   Clock* const clock_;
   const std::unique_ptr<Call> call_;
-  ScopedTaskSafety task_safety_;
+  // For cancelling tasks on the network thread when DegradedCall is destroyed
+  rtc::scoped_refptr<PendingTaskSafetyFlag> call_alive_;
   size_t send_config_index_;
   const std::vector<TimeScopedNetworkConfig> send_configs_;
   SimulatedNetwork* send_simulated_network_;
@@ -192,6 +212,7 @@
   const std::vector<TimeScopedNetworkConfig> receive_configs_;
   SimulatedNetwork* receive_simulated_network_;
   std::unique_ptr<FakeNetworkPipe> receive_pipe_;
+  std::unique_ptr<ThreadedPacketReceiver> packet_receiver_;
 };
 
 }  // namespace webrtc
diff --git a/pc/peer_connection_field_trial_tests.cc b/pc/peer_connection_field_trial_tests.cc
index 528b6ba..0e6e451 100644
--- a/pc/peer_connection_field_trial_tests.cc
+++ b/pc/peer_connection_field_trial_tests.cc
@@ -25,15 +25,35 @@
 #include "pc/peer_connection_wrapper.h"
 #include "pc/session_description.h"
 #include "pc/test/fake_audio_capture_module.h"
+#include "pc/test/frame_generator_capturer_video_track_source.h"
 #include "pc/test/peer_connection_test_wrapper.h"
+#include "rtc_base/gunit.h"
 #include "rtc_base/internal/default_socket_server.h"
 #include "rtc_base/physical_socket_server.h"
 #include "rtc_base/thread.h"
 #include "test/gtest.h"
 #include "test/scoped_key_value_config.h"
 
+#ifdef WEBRTC_ANDROID
+#include "pc/test/android_test_initializer.h"
+#endif
+
 namespace webrtc {
 
+namespace {
+static const int kDefaultTimeoutMs = 5000;
+
+bool AddIceCandidates(PeerConnectionWrapper* peer,
+                      std::vector<const IceCandidateInterface*> candidates) {
+  for (const auto candidate : candidates) {
+    if (!peer->pc()->AddIceCandidate(candidate)) {
+      return false;
+    }
+  }
+  return true;
+}
+}  // namespace
+
 using RTCConfiguration = PeerConnectionInterface::RTCConfiguration;
 
 class PeerConnectionFieldTrialTest : public ::testing::Test {
@@ -41,8 +61,12 @@
   typedef std::unique_ptr<PeerConnectionWrapper> WrapperPtr;
 
   PeerConnectionFieldTrialTest()
-      : socket_server_(rtc::CreateDefaultSocketServer()),
+      : clock_(Clock::GetRealTimeClock()),
+        socket_server_(rtc::CreateDefaultSocketServer()),
         main_thread_(socket_server_.get()) {
+#ifdef WEBRTC_ANDROID
+    InitializeAndroidObjects();
+#endif
     webrtc::PeerConnectionInterface::IceServer ice_server;
     ice_server.uri = "stun:stun.l.google.com:19302";
     config_.servers.push_back(ice_server);
@@ -54,8 +78,6 @@
   void CreatePCFactory(std::unique_ptr<FieldTrialsView> field_trials) {
     PeerConnectionFactoryDependencies pcf_deps;
     pcf_deps.signaling_thread = rtc::Thread::Current();
-    pcf_deps.worker_thread = rtc::Thread::Current();
-    pcf_deps.network_thread = rtc::Thread::Current();
     pcf_deps.trials = std::move(field_trials);
     pcf_deps.task_queue_factory = CreateDefaultTaskQueueFactory();
     pcf_deps.call_factory = webrtc::CreateCallFactory();
@@ -66,6 +88,13 @@
     webrtc::SetMediaEngineDefaults(&media_deps);
     pcf_deps.media_engine = cricket::CreateMediaEngine(std::move(media_deps));
     pc_factory_ = CreateModularPeerConnectionFactory(std::move(pcf_deps));
+
+    // Allow ADAPTER_TYPE_LOOPBACK to create PeerConnections with loopback in
+    // this test.
+    RTC_DCHECK(pc_factory_);
+    PeerConnectionFactoryInterface::Options options;
+    options.network_ignore_mask = 0;
+    pc_factory_->SetOptions(options);
   }
 
   WrapperPtr CreatePeerConnection() {
@@ -79,6 +108,7 @@
         pc_factory_, result.MoveValue(), std::move(observer));
   }
 
+  Clock* const clock_;
   std::unique_ptr<rtc::SocketServer> socket_server_;
   rtc::AutoSocketServerThread main_thread_;
   rtc::scoped_refptr<PeerConnectionFactoryInterface> pc_factory_ = nullptr;
@@ -188,4 +218,50 @@
   EXPECT_TRUE(found2);
 }
 
+// Test that the ability to emulate degraded networks works without crashing.
+TEST_F(PeerConnectionFieldTrialTest, ApplyFakeNetworkConfig) {
+  std::unique_ptr<test::ScopedKeyValueConfig> field_trials =
+      std::make_unique<test::ScopedKeyValueConfig>(
+          "WebRTC-FakeNetworkSendConfig/link_capacity_kbps:500/"
+          "WebRTC-FakeNetworkReceiveConfig/loss_percent:1/");
+
+  CreatePCFactory(std::move(field_trials));
+
+  WrapperPtr caller = CreatePeerConnection();
+  FrameGeneratorCapturerVideoTrackSource::Config config;
+  auto video_track_source =
+      rtc::make_ref_counted<FrameGeneratorCapturerVideoTrackSource>(
+          config, clock_, /*is_screencast=*/false);
+  caller->AddTrack(
+      pc_factory_->CreateVideoTrack("v", video_track_source.get()));
+  WrapperPtr callee = CreatePeerConnection();
+
+  ASSERT_TRUE(callee->SetRemoteDescription(caller->CreateOfferAndSetAsLocal()));
+  ASSERT_TRUE(
+      caller->SetRemoteDescription(callee->CreateAnswerAndSetAsLocal()));
+
+  // Do the SDP negotiation, and also exchange ice candidates.
+  ASSERT_TRUE(caller->ExchangeOfferAnswerWith(callee.get()));
+  ASSERT_TRUE_WAIT(
+      caller->signaling_state() == PeerConnectionInterface::kStable,
+      kDefaultTimeoutMs);
+  ASSERT_TRUE_WAIT(caller->IsIceGatheringDone(), kDefaultTimeoutMs);
+  ASSERT_TRUE_WAIT(callee->IsIceGatheringDone(), kDefaultTimeoutMs);
+
+  // Connect an ICE candidate pairs.
+  ASSERT_TRUE(
+      AddIceCandidates(callee.get(), caller->observer()->GetAllCandidates()));
+  ASSERT_TRUE(
+      AddIceCandidates(caller.get(), callee->observer()->GetAllCandidates()));
+
+  // This means that ICE and DTLS are connected.
+  ASSERT_TRUE_WAIT(callee->IsIceConnected(), kDefaultTimeoutMs);
+  ASSERT_TRUE_WAIT(caller->IsIceConnected(), kDefaultTimeoutMs);
+
+  // Send packets for kDefaultTimeoutMs
+  // For now, whether this field trial works or not is checked by
+  // whether a crash occurs. Additional validation can be added later.
+  WAIT(false, kDefaultTimeoutMs);
+}
+
 }  // namespace webrtc