Calculate next process time in simulated network.

Currently there's an implicit requirement that users of
SimulatedNetwork should call it repeatedly, even if the return value
of NextDeliveryTimeUs is unset.

With this change, it will indicate that there might be a delivery in
5 ms at any time there are packets in queue. Which results in unchanged
behavior compared to current usage but allows new users to expect
robust behavior.

Bug: webrtc:9510
Change-Id: I45b8b5f1f0d3d13a8ec9b163d4011c5f01a53069
Reviewed-on: https://webrtc-review.googlesource.com/c/120402
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Christoffer Rodbro <crodbro@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26617}
diff --git a/call/BUILD.gn b/call/BUILD.gn
index 38b2f41..9011094 100644
--- a/call/BUILD.gn
+++ b/call/BUILD.gn
@@ -228,6 +228,7 @@
     "../logging:rtc_event_rtp_rtcp",
     "../logging:rtc_event_video",
     "../logging:rtc_stream_config",
+    "../modules:module_api",
     "../modules/bitrate_controller",
     "../modules/congestion_controller",
     "../modules/pacing",
@@ -300,7 +301,6 @@
   deps = [
     ":call_interfaces",
     "../api:simulated_network_api",
-    "../modules:module_api",
   ]
 }
 
@@ -317,7 +317,6 @@
     "../api:libjingle_peerconnection_api",
     "../api:simulated_network_api",
     "../api:transport_api",
-    "../modules:module_api",
     "../modules/utility",
     "../rtc_base:checks",
     "../rtc_base:rtc_base_approved",
diff --git a/call/degraded_call.cc b/call/degraded_call.cc
index c066d5d..4b6d1af 100644
--- a/call/degraded_call.cc
+++ b/call/degraded_call.cc
@@ -15,6 +15,55 @@
 #include "rtc_base/location.h"
 
 namespace webrtc {
+
+namespace {
+constexpr int64_t kDoNothingProcessIntervalMs = 5000;
+}  // namespace
+
+FakeNetworkPipeModule::~FakeNetworkPipeModule() = default;
+
+FakeNetworkPipeModule::FakeNetworkPipeModule(
+    Clock* clock,
+    std::unique_ptr<NetworkBehaviorInterface> network_behavior,
+    Transport* transport)
+    : pipe_(clock, std::move(network_behavior), transport) {}
+
+void FakeNetworkPipeModule::SendRtp(const uint8_t* packet,
+                                    size_t length,
+                                    const PacketOptions& options) {
+  pipe_.SendRtp(packet, length, options);
+  MaybeResumeProcess();
+}
+
+void FakeNetworkPipeModule::SendRtcp(const uint8_t* packet, size_t length) {
+  pipe_.SendRtcp(packet, length);
+  MaybeResumeProcess();
+}
+
+void FakeNetworkPipeModule::MaybeResumeProcess() {
+  rtc::CritScope cs(&process_thread_lock_);
+  if (!pending_process_ && pipe_.TimeUntilNextProcess() && process_thread_) {
+    process_thread_->WakeUp(nullptr);
+  }
+}
+
+int64_t FakeNetworkPipeModule::TimeUntilNextProcess() {
+  auto delay = pipe_.TimeUntilNextProcess();
+  rtc::CritScope cs(&process_thread_lock_);
+  pending_process_ = delay.has_value();
+  return delay.value_or(kDoNothingProcessIntervalMs);
+}
+
+void FakeNetworkPipeModule::ProcessThreadAttached(
+    ProcessThread* process_thread) {
+  rtc::CritScope cs(&process_thread_lock_);
+  process_thread_ = process_thread;
+}
+
+void FakeNetworkPipeModule::Process() {
+  pipe_.Process();
+}
+
 DegradedCall::DegradedCall(
     std::unique_ptr<Call> call,
     absl::optional<BuiltInNetworkBehaviorConfig> send_config,
@@ -72,8 +121,8 @@
   if (send_config_ && !send_pipe_) {
     auto network = absl::make_unique<SimulatedNetwork>(*send_config_);
     send_simulated_network_ = network.get();
-    send_pipe_ = absl::make_unique<FakeNetworkPipe>(clock_, std::move(network),
-                                                    config.send_transport);
+    send_pipe_ = absl::make_unique<FakeNetworkPipeModule>(
+        clock_, std::move(network), config.send_transport);
     config.send_transport = this;
     send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE);
   }
@@ -89,8 +138,8 @@
   if (send_config_ && !send_pipe_) {
     auto network = absl::make_unique<SimulatedNetwork>(*send_config_);
     send_simulated_network_ = network.get();
-    send_pipe_ = absl::make_unique<FakeNetworkPipe>(clock_, std::move(network),
-                                                    config.send_transport);
+    send_pipe_ = absl::make_unique<FakeNetworkPipeModule>(
+        clock_, std::move(network), config.send_transport);
     config.send_transport = this;
     send_process_thread_->RegisterModule(send_pipe_.get(), RTC_FROM_HERE);
   }
diff --git a/call/degraded_call.h b/call/degraded_call.h
index 8f062c3..89eafdb 100644
--- a/call/degraded_call.h
+++ b/call/degraded_call.h
@@ -32,6 +32,7 @@
 #include "call/simulated_network.h"
 #include "call/video_receive_stream.h"
 #include "call/video_send_stream.h"
+#include "modules/include/module.h"
 #include "modules/utility/include/process_thread.h"
 #include "rtc_base/bitrate_allocation_strategy.h"
 #include "rtc_base/copy_on_write_buffer.h"
@@ -39,6 +40,30 @@
 #include "system_wrappers/include/clock.h"
 
 namespace webrtc {
+class FakeNetworkPipeModule : public Module {
+ public:
+  FakeNetworkPipeModule(
+      Clock* clock,
+      std::unique_ptr<NetworkBehaviorInterface> network_behavior,
+      Transport* transport);
+  ~FakeNetworkPipeModule() override;
+  void SendRtp(const uint8_t* packet,
+               size_t length,
+               const PacketOptions& options);
+  void SendRtcp(const uint8_t* packet, size_t length);
+
+  // Implements Module interface
+  int64_t TimeUntilNextProcess() override;
+  void ProcessThreadAttached(ProcessThread* process_thread) override;
+  void Process() override;
+
+ private:
+  void MaybeResumeProcess();
+  FakeNetworkPipe pipe_;
+  rtc::CriticalSection process_thread_lock_;
+  ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr;
+  bool pending_process_ RTC_GUARDED_BY(process_thread_lock_) = false;
+};
 
 class DegradedCall : public Call, private Transport, private PacketReceiver {
  public:
@@ -111,7 +136,7 @@
   const absl::optional<BuiltInNetworkBehaviorConfig> send_config_;
   const std::unique_ptr<ProcessThread> send_process_thread_;
   SimulatedNetwork* send_simulated_network_;
-  std::unique_ptr<FakeNetworkPipe> send_pipe_;
+  std::unique_ptr<FakeNetworkPipeModule> send_pipe_;
   size_t num_send_streams_;
 
   const absl::optional<BuiltInNetworkBehaviorConfig> receive_config_;
diff --git a/call/fake_network_pipe.cc b/call/fake_network_pipe.cc
index b5c0cb5..46adcb4 100644
--- a/call/fake_network_pipe.cc
+++ b/call/fake_network_pipe.cc
@@ -24,7 +24,6 @@
 namespace webrtc {
 
 namespace {
-constexpr int64_t kDefaultProcessIntervalMs = 5;
 constexpr int64_t kLogIntervalMs = 5000;
 }  // namespace
 
@@ -167,12 +166,6 @@
     packets_in_flight_.pop_back();
     ++dropped_packets_;
   }
-  if (network_behavior_->NextDeliveryTimeUs()) {
-    rtc::CritScope crit(&process_thread_lock_);
-    if (process_thread_)
-      process_thread_->WakeUp(nullptr);
-  }
-
   return sent;
 }
 
@@ -292,19 +285,14 @@
   }
 }
 
-int64_t FakeNetworkPipe::TimeUntilNextProcess() {
+absl::optional<int64_t> FakeNetworkPipe::TimeUntilNextProcess() {
   rtc::CritScope crit(&process_lock_);
   absl::optional<int64_t> delivery_us = network_behavior_->NextDeliveryTimeUs();
   if (delivery_us) {
     int64_t delay_us = *delivery_us - clock_->TimeInMicroseconds();
     return std::max<int64_t>((delay_us + 500) / 1000, 0);
   }
-  return kDefaultProcessIntervalMs;
-}
-
-void FakeNetworkPipe::ProcessThreadAttached(ProcessThread* process_thread) {
-  rtc::CritScope cs(&process_thread_lock_);
-  process_thread_ = process_thread;
+  return absl::nullopt;
 }
 
 bool FakeNetworkPipe::HasTransport() const {
diff --git a/call/fake_network_pipe.h b/call/fake_network_pipe.h
index 2c41dbf..661815b 100644
--- a/call/fake_network_pipe.h
+++ b/call/fake_network_pipe.h
@@ -86,8 +86,7 @@
 
 // Class faking a network link, internally is uses an implementation of a
 // SimulatedNetworkInterface to simulate network behavior.
-class FakeNetworkPipe : public webrtc::SimulatedPacketReceiverInterface,
-                        public Transport {
+class FakeNetworkPipe : public SimulatedPacketReceiverInterface {
  public:
   // Will keep |network_behavior| alive while pipe is alive itself.
   // Use these constructors if you plan to insert packets using DeliverPacket().
@@ -119,8 +118,8 @@
   // constructor.
   bool SendRtp(const uint8_t* packet,
                size_t length,
-               const PacketOptions& options) override;
-  bool SendRtcp(const uint8_t* packet, size_t length) override;
+               const PacketOptions& options);
+  bool SendRtcp(const uint8_t* packet, size_t length);
 
   // Implements the PacketReceiver interface. When/if packets are delivered,
   // they will be passed directly to the receiver instance given in
@@ -138,8 +137,7 @@
   // Processes the network queues and trigger PacketReceiver::IncomingPacket for
   // packets ready to be delivered.
   void Process() override;
-  int64_t TimeUntilNextProcess() override;
-  void ProcessThreadAttached(ProcessThread* process_thread) override;
+  absl::optional<int64_t> TimeUntilNextProcess() override;
 
   // Get statistics.
   float PercentageLoss();
@@ -193,10 +191,6 @@
   // |process_lock| guards the data structures involved in delay and loss
   // processes, such as the packet queues.
   rtc::CriticalSection process_lock_;
-
-  rtc::CriticalSection process_thread_lock_;
-  ProcessThread* process_thread_ RTC_GUARDED_BY(process_thread_lock_) = nullptr;
-
   // Packets  are added at the back of the deque, this makes the deque ordered
   // by increasing send time. The common case when removing packets from the
   // deque is removing early packets, which will be close to the front of the
diff --git a/call/simulated_network.cc b/call/simulated_network.cc
index 0884b29..c80255f 100644
--- a/call/simulated_network.cc
+++ b/call/simulated_network.cc
@@ -20,6 +20,9 @@
 #include "rtc_base/checks.h"
 
 namespace webrtc {
+namespace {
+constexpr int64_t kDefaultProcessDelayUs = 5000;
+}
 
 SimulatedNetwork::SimulatedNetwork(SimulatedNetwork::Config config,
                                    uint64_t random_seed)
@@ -76,15 +79,16 @@
   // calculated in UpdateCapacityQueue.
   queue_size_bytes_ += packet.size;
   capacity_link_.push({packet, packet.send_time_us});
+  if (!next_process_time_us_) {
+    next_process_time_us_ = packet.send_time_us + kDefaultProcessDelayUs;
+  }
 
   return true;
 }
 
 absl::optional<int64_t> SimulatedNetwork::NextDeliveryTimeUs() const {
   RTC_DCHECK_RUNS_SERIALIZED(&process_checker_);
-  if (!delay_link_.empty())
-    return delay_link_.begin()->arrival_time_us;
-  return absl::nullopt;
+  return next_process_time_us_;
 }
 
 void SimulatedNetwork::UpdateCapacityQueue(ConfigState state,
@@ -198,6 +202,14 @@
         PacketDeliveryInfo(packet_info.packet, packet_info.arrival_time_us));
     delay_link_.pop_front();
   }
+
+  if (!delay_link_.empty()) {
+    next_process_time_us_ = delay_link_.front().arrival_time_us;
+  } else if (!capacity_link_.empty()) {
+    next_process_time_us_ = receive_time_us + kDefaultProcessDelayUs;
+  } else {
+    next_process_time_us_.reset();
+  }
   return packets_to_deliver;
 }
 
diff --git a/call/simulated_network.h b/call/simulated_network.h
index 6adb412..632eb5d 100644
--- a/call/simulated_network.h
+++ b/call/simulated_network.h
@@ -87,6 +87,8 @@
   int64_t pending_drain_bits_ RTC_GUARDED_BY(process_checker_) = 0;
   absl::optional<int64_t> last_capacity_link_visit_us_
       RTC_GUARDED_BY(process_checker_);
+  absl::optional<int64_t> next_process_time_us_
+      RTC_GUARDED_BY(process_checker_);
 };
 
 }  // namespace webrtc
diff --git a/call/simulated_packet_receiver.h b/call/simulated_packet_receiver.h
index 03d7e96..2db46e8 100644
--- a/call/simulated_packet_receiver.h
+++ b/call/simulated_packet_receiver.h
@@ -13,13 +13,12 @@
 
 #include "api/test/simulated_network.h"
 #include "call/packet_receiver.h"
-#include "modules/include/module.h"
 
 namespace webrtc {
 
 // Private API that is fixing surface between DirectTransport and underlying
 // network conditions simulation implementation.
-class SimulatedPacketReceiverInterface : public PacketReceiver, public Module {
+class SimulatedPacketReceiverInterface : public PacketReceiver {
  public:
   // Must not be called in parallel with DeliverPacket or Process.
   // Destination receiver will be injected with this method
@@ -27,6 +26,15 @@
 
   // Reports average packet delay.
   virtual int AverageDelay() = 0;
+
+  // Process any pending tasks such as timeouts.
+  // Called on a worker thread.
+  virtual void Process() = 0;
+
+  // Returns the time until next process or nullopt to indicate that the next
+  // process time is unknown. If the next process time is unknown, this should
+  // be checked again any time a packet is enqueued.
+  virtual absl::optional<int64_t> TimeUntilNextProcess() = 0;
 };
 
 }  // namespace webrtc
diff --git a/call/test/fake_network_pipe_unittest.cc b/call/test/fake_network_pipe_unittest.cc
index 9f2a663..b8c7e56 100644
--- a/call/test/fake_network_pipe_unittest.cc
+++ b/call/test/fake_network_pipe_unittest.cc
@@ -259,7 +259,7 @@
 
   // Check that all the packets were sent.
   EXPECT_EQ(static_cast<size_t>(2 * kNumPackets), pipe->SentPackets());
-  fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess());
+  fake_clock_.AdvanceTimeMilliseconds(*pipe->TimeUntilNextProcess());
   EXPECT_CALL(receiver, DeliverPacket(_, _, _)).Times(0);
   pipe->Process();
 }
@@ -307,7 +307,7 @@
 
   // Check that all the packets were sent.
   EXPECT_EQ(static_cast<size_t>(kNumPackets), pipe->SentPackets());
-  fake_clock_.AdvanceTimeMilliseconds(pipe->TimeUntilNextProcess());
+  fake_clock_.AdvanceTimeMilliseconds(*pipe->TimeUntilNextProcess());
   EXPECT_CALL(receiver, DeliverPacket(_, _, _)).Times(0);
   pipe->Process();
 }
diff --git a/test/direct_transport.cc b/test/direct_transport.cc
index fd73691..b755449 100644
--- a/test/direct_transport.cc
+++ b/test/direct_transport.cc
@@ -50,19 +50,18 @@
 }
 
 DirectTransport::~DirectTransport() {
-  RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
-  // Constructor updates |next_scheduled_task_|, so it's guaranteed to
-  // be initialized.
-  task_queue_->CancelTask(next_scheduled_task_);
+  if (next_process_task_)
+    task_queue_->CancelTask(*next_process_task_);
 }
 
 void DirectTransport::StopSending() {
-  RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
-  task_queue_->CancelTask(next_scheduled_task_);
+  rtc::CritScope cs(&process_lock_);
+  if (next_process_task_)
+    task_queue_->CancelTask(*next_process_task_);
 }
 
 void DirectTransport::SetReceiver(PacketReceiver* receiver) {
-  RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
+  rtc::CritScope cs(&process_lock_);
   fake_network_->SetReceiver(receiver);
 }
 
@@ -92,6 +91,9 @@
   int64_t send_time = clock_->TimeInMicroseconds();
   fake_network_->DeliverPacket(media_type, rtc::CopyOnWriteBuffer(data, length),
                                send_time);
+  rtc::CritScope cs(&process_lock_);
+  if (!next_process_task_)
+    ProcessPackets();
 }
 
 int DirectTransport::GetAverageDelayMs() {
@@ -104,17 +106,20 @@
     send_call_->SignalChannelNetworkState(MediaType::AUDIO, kNetworkUp);
     send_call_->SignalChannelNetworkState(MediaType::VIDEO, kNetworkUp);
   }
-  SendPackets();
 }
 
-void DirectTransport::SendPackets() {
-  RTC_DCHECK_CALLED_SEQUENTIALLY(&sequence_checker_);
-
-  fake_network_->Process();
-
-  int64_t delay_ms = fake_network_->TimeUntilNextProcess();
-  next_scheduled_task_ =
-      task_queue_->PostDelayedTask([this]() { SendPackets(); }, delay_ms);
+void DirectTransport::ProcessPackets() {
+  next_process_task_.reset();
+  auto delay_ms = fake_network_->TimeUntilNextProcess();
+  if (delay_ms) {
+    next_process_task_ = task_queue_->PostDelayedTask(
+        [this]() {
+          fake_network_->Process();
+          rtc::CritScope cs(&process_lock_);
+          ProcessPackets();
+        },
+        *delay_ms);
+  }
 }
 }  // namespace test
 }  // namespace webrtc
diff --git a/test/direct_transport.h b/test/direct_transport.h
index f926ec5..d70748f 100644
--- a/test/direct_transport.h
+++ b/test/direct_transport.h
@@ -60,7 +60,7 @@
   int GetAverageDelayMs();
 
  private:
-  void SendPackets();
+  void ProcessPackets() RTC_EXCLUSIVE_LOCKS_REQUIRED(&process_lock_);
   void SendPacket(const uint8_t* data, size_t length);
   void Start();
 
@@ -68,13 +68,14 @@
   Clock* const clock_;
 
   SingleThreadedTaskQueueForTesting* const task_queue_;
-  SingleThreadedTaskQueueForTesting::TaskId next_scheduled_task_
-      RTC_GUARDED_BY(&sequence_checker_);
+
+  rtc::CriticalSection process_lock_;
+  absl::optional<SingleThreadedTaskQueueForTesting::TaskId> next_process_task_
+      RTC_GUARDED_BY(&process_lock_);
 
   const Demuxer demuxer_;
   const std::unique_ptr<SimulatedPacketReceiverInterface> fake_network_;
 
-  rtc::SequencedTaskChecker sequence_checker_;
 };
 }  // namespace test
 }  // namespace webrtc