Add periodic compound packet sending to RtcpTransceiver

Bug: webrtc:8239
Change-Id: I1511db63a15e8c5101a933e55e66d3877ff963be
Reviewed-on: https://webrtc-review.googlesource.com/15440
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#20480}
diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn
index b3e9f23..52c40e2 100644
--- a/modules/rtp_rtcp/BUILD.gn
+++ b/modules/rtp_rtcp/BUILD.gn
@@ -232,6 +232,8 @@
     "../../api:array_view",
     "../../api:transport_api",
     "../../rtc_base:rtc_base_approved",
+    "../../rtc_base:rtc_task_queue",
+    "../../rtc_base:weak_ptr",
   ]
 }
 
@@ -390,11 +392,13 @@
       "../..:webrtc_common",
       "../../api:array_view",
       "../../api:libjingle_peerconnection_api",
+      "../../api:optional",
       "../../api:transport_api",
       "../../call:rtp_receiver",
       "../../common_video:common_video",
       "../../logging:rtc_event_log_api",
       "../../rtc_base:rtc_base_approved",
+      "../../rtc_base:rtc_task_queue",
       "../../system_wrappers:system_wrappers",
       "../../test:field_trial",
       "../../test:rtp_test_utils",
diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_config.cc b/modules/rtp_rtcp/source/rtcp_transceiver_config.cc
index c129d17..0cd0abe 100644
--- a/modules/rtp_rtcp/source/rtcp_transceiver_config.cc
+++ b/modules/rtp_rtcp/source/rtcp_transceiver_config.cc
@@ -43,15 +43,20 @@
                   << " more than " << IP_PACKET_SIZE << " is unsupported.";
     return false;
   }
-  if (outgoing_transport == nullptr) {
+  if (!outgoing_transport) {
     LOG(LS_ERROR) << debug_id << "outgoing transport must be set";
     return false;
   }
-  if (min_periodic_report_ms <= 0) {
-    LOG(LS_ERROR) << debug_id << "period " << min_periodic_report_ms
+  if (report_period_ms <= 0) {
+    LOG(LS_ERROR) << debug_id << "period " << report_period_ms
                   << "ms between reports should be positive.";
     return false;
   }
+  if (schedule_periodic_compound_packets && !task_queue) {
+    LOG(LS_ERROR) << debug_id
+                  << "missing task queue for periodic compound packets";
+    return false;
+  }
   // TODO(danilchap): Remove or update the warning when RtcpTransceiver supports
   // send-only sessions.
   if (receive_statistics == nullptr)
diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_config.h b/modules/rtp_rtcp/source/rtcp_transceiver_config.h
index 187ad18..79f18c8 100644
--- a/modules/rtp_rtcp/source/rtcp_transceiver_config.h
+++ b/modules/rtp_rtcp/source/rtcp_transceiver_config.h
@@ -13,6 +13,8 @@
 
 #include <string>
 
+#include "rtc_base/task_queue.h"
+
 namespace webrtc {
 class ReceiveStatisticsProvider;
 class Transport;
@@ -43,11 +45,25 @@
   // Transport to send rtcp packets to. Should be set.
   Transport* outgoing_transport = nullptr;
 
-  // Minimum period to send receiver reports and attached messages.
-  int min_periodic_report_ms = 1000;
+  // Queue for scheduling delayed tasks, e.g. sending periodic compound packets.
+  rtc::TaskQueue* task_queue = nullptr;
 
   // Rtcp report block generator for outgoing receiver reports.
   ReceiveStatisticsProvider* receive_statistics = nullptr;
+
+  //
+  // Tuning parameters.
+  //
+  // Delay before 1st periodic compound packet.
+  int initial_report_delay_ms = 500;
+
+  // Period between periodic compound packets.
+  int report_period_ms = 1000;
+
+  //
+  // Flags for features and experiments.
+  //
+  bool schedule_periodic_compound_packets = true;
 };
 
 }  // namespace webrtc
diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc
index c8db3c8..4c4ae95 100644
--- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc
+++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.cc
@@ -21,6 +21,8 @@
 #include "modules/rtp_rtcp/source/rtcp_packet/report_block.h"
 #include "modules/rtp_rtcp/source/rtcp_packet/sdes.h"
 #include "rtc_base/checks.h"
+#include "rtc_base/ptr_util.h"
+#include "rtc_base/task_queue.h"
 
 namespace webrtc {
 namespace {
@@ -66,13 +68,55 @@
 }  // namespace
 
 RtcpTransceiverImpl::RtcpTransceiverImpl(const RtcpTransceiverConfig& config)
-    : config_(config) {
+    : config_(config), ptr_factory_(this) {
   RTC_CHECK(config_.Validate());
+  if (config_.schedule_periodic_compound_packets)
+    ReschedulePeriodicCompoundPackets(config_.initial_report_delay_ms);
 }
 
 RtcpTransceiverImpl::~RtcpTransceiverImpl() = default;
 
 void RtcpTransceiverImpl::SendCompoundPacket() {
+  SendPacket();
+  if (config_.schedule_periodic_compound_packets)
+    ReschedulePeriodicCompoundPackets(config_.report_period_ms);
+}
+
+void RtcpTransceiverImpl::ReschedulePeriodicCompoundPackets(int64_t delay_ms) {
+  class SendPeriodicCompoundPacket : public rtc::QueuedTask {
+   public:
+    SendPeriodicCompoundPacket(rtc::TaskQueue* task_queue,
+                               rtc::WeakPtr<RtcpTransceiverImpl> ptr)
+        : task_queue_(task_queue), ptr_(std::move(ptr)) {}
+    bool Run() override {
+      RTC_DCHECK(task_queue_->IsCurrent());
+      if (!ptr_)
+        return true;
+      ptr_->SendPacket();
+      task_queue_->PostDelayedTask(rtc::WrapUnique(this),
+                                   ptr_->config_.report_period_ms);
+      return false;
+    }
+
+   private:
+    rtc::TaskQueue* const task_queue_;
+    const rtc::WeakPtr<RtcpTransceiverImpl> ptr_;
+  };
+
+  RTC_DCHECK(config_.schedule_periodic_compound_packets);
+  RTC_DCHECK(config_.task_queue->IsCurrent());
+
+  // Stop existent send task if there is one.
+  ptr_factory_.InvalidateWeakPtrs();
+  auto task = rtc::MakeUnique<SendPeriodicCompoundPacket>(
+      config_.task_queue, ptr_factory_.GetWeakPtr());
+  if (delay_ms > 0)
+    config_.task_queue->PostDelayedTask(std::move(task), delay_ms);
+  else
+    config_.task_queue->PostTask(std::move(task));
+}
+
+void RtcpTransceiverImpl::SendPacket() {
   PacketSender sender(config_.outgoing_transport, config_.max_packet_size);
 
   rtcp::ReceiverReport rr;
diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl.h b/modules/rtp_rtcp/source/rtcp_transceiver_impl.h
index decb4fe..2a9e3d3 100644
--- a/modules/rtp_rtcp/source/rtcp_transceiver_impl.h
+++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl.h
@@ -17,6 +17,7 @@
 #include "api/array_view.h"
 #include "modules/rtp_rtcp/source/rtcp_transceiver_config.h"
 #include "rtc_base/constructormagic.h"
+#include "rtc_base/weak_ptr.h"
 
 namespace webrtc {
 //
@@ -32,8 +33,14 @@
   void SendCompoundPacket();
 
  private:
+  void ReschedulePeriodicCompoundPackets(int64_t delay_ms);
+  // Sends RTCP packets.
+  void SendPacket();
+
   const RtcpTransceiverConfig config_;
 
+  rtc::WeakPtrFactory<RtcpTransceiverImpl> ptr_factory_;
+
   RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RtcpTransceiverImpl);
 };
 
diff --git a/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc b/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc
index ca335ec..9e94644 100644
--- a/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc
+++ b/modules/rtp_rtcp/source/rtcp_transceiver_impl_unittest.cc
@@ -13,7 +13,9 @@
 #include <vector>
 
 #include "modules/rtp_rtcp/include/receive_statistics.h"
+#include "rtc_base/event.h"
 #include "rtc_base/ptr_util.h"
+#include "rtc_base/task_queue.h"
 #include "test/gmock.h"
 #include "test/gtest.h"
 #include "test/mock_transport.h"
@@ -36,6 +38,119 @@
   MOCK_METHOD1(RtcpReportBlocks, std::vector<ReportBlock>(size_t));
 };
 
+// Helper to wait for an rtcp packet produced on a different thread/task queue.
+class FakeRtcpTransport : public webrtc::Transport {
+ public:
+  FakeRtcpTransport() : sent_rtcp_(false, false) {}
+  bool SendRtcp(const uint8_t* data, size_t size) override {
+    sent_rtcp_.Set();
+    return true;
+  }
+  bool SendRtp(const uint8_t*, size_t, const webrtc::PacketOptions&) override {
+    ADD_FAILURE() << "RtcpTransciver shouldn't send rtp packets.";
+    return true;
+  }
+
+  // Returns true if packet was received by this transport before timeout,
+  bool WaitPacket(int64_t timeout_ms) { return sent_rtcp_.Wait(timeout_ms); }
+
+ private:
+  rtc::Event sent_rtcp_;
+};
+
+// Posting delayed tasks doesn't promise high precision.
+constexpr int64_t kTaskQueuePrecisionMs = 15;
+
+TEST(RtcpTransceiverImplTest, DelaysSendingFirstCompondPacket) {
+  rtc::TaskQueue queue("rtcp");
+  FakeRtcpTransport transport;
+  RtcpTransceiverConfig config;
+  config.outgoing_transport = &transport;
+  config.initial_report_delay_ms = 10;
+  config.task_queue = &queue;
+  rtc::Optional<RtcpTransceiverImpl> rtcp_transceiver;
+
+  int64_t started_ms = rtc::TimeMillis();
+  queue.PostTask([&] { rtcp_transceiver.emplace(config); });
+  EXPECT_TRUE(transport.WaitPacket(config.initial_report_delay_ms +
+                                   kTaskQueuePrecisionMs));
+
+  EXPECT_GE(rtc::TimeMillis() - started_ms, config.initial_report_delay_ms);
+
+  // Cleanup.
+  rtc::Event done(false, false);
+  queue.PostTask([&] {
+    rtcp_transceiver.reset();
+    done.Set();
+  });
+  ASSERT_TRUE(done.Wait(/*milliseconds=*/100));
+}
+
+TEST(RtcpTransceiverImplTest, PeriodicallySendsPackets) {
+  rtc::TaskQueue queue("rtcp");
+  FakeRtcpTransport transport;
+  RtcpTransceiverConfig config;
+  config.outgoing_transport = &transport;
+  config.initial_report_delay_ms = 0;
+  config.report_period_ms = 10;
+  config.task_queue = &queue;
+  rtc::Optional<RtcpTransceiverImpl> rtcp_transceiver;
+  queue.PostTask([&] { rtcp_transceiver.emplace(config); });
+
+  EXPECT_TRUE(transport.WaitPacket(config.initial_report_delay_ms +
+                                   kTaskQueuePrecisionMs));
+  int64_t time_of_1st_packet_ms = rtc::TimeMillis();
+  EXPECT_TRUE(
+      transport.WaitPacket(config.report_period_ms + kTaskQueuePrecisionMs));
+  int64_t time_of_2nd_packet_ms = rtc::TimeMillis();
+
+  EXPECT_GE(time_of_2nd_packet_ms - time_of_1st_packet_ms,
+            config.report_period_ms);
+
+  // Cleanup.
+  rtc::Event done(false, false);
+  queue.PostTask([&] {
+    rtcp_transceiver.reset();
+    done.Set();
+  });
+  ASSERT_TRUE(done.Wait(/*milliseconds=*/100));
+}
+
+TEST(RtcpTransceiverImplTest, SendCompoundPacketDelaysPeriodicSendPackets) {
+  rtc::TaskQueue queue("rtcp");
+  FakeRtcpTransport transport;
+  RtcpTransceiverConfig config;
+  config.outgoing_transport = &transport;
+  config.initial_report_delay_ms = 0;
+  config.report_period_ms = 10;
+  config.task_queue = &queue;
+  rtc::Optional<RtcpTransceiverImpl> rtcp_transceiver;
+  queue.PostTask([&] { rtcp_transceiver.emplace(config); });
+
+  // Wait for first packet.
+  EXPECT_TRUE(transport.WaitPacket(config.initial_report_delay_ms +
+                                   kTaskQueuePrecisionMs));
+  // Wait half-period time for next one - it shouldn't be sent.
+  EXPECT_FALSE(transport.WaitPacket(config.report_period_ms / 2));
+  // Send packet now.
+  queue.PostTask([&] { rtcp_transceiver->SendCompoundPacket(); });
+  EXPECT_TRUE(transport.WaitPacket(/*timeout_ms=*/1));
+  int64_t time_of_non_periodic_packet_ms = rtc::TimeMillis();
+  // Next packet should be sent at least after period_ms.
+  EXPECT_TRUE(
+      transport.WaitPacket(config.report_period_ms + kTaskQueuePrecisionMs));
+  EXPECT_GE(rtc::TimeMillis() - time_of_non_periodic_packet_ms,
+            config.report_period_ms);
+
+  // Cleanup.
+  rtc::Event done(false, false);
+  queue.PostTask([&] {
+    rtcp_transceiver.reset();
+    done.Set();
+  });
+  ASSERT_TRUE(done.Wait(/*milliseconds=*/100));
+}
+
 TEST(RtcpTransceiverImplTest, SendsMinimalCompoundPacket) {
   const uint32_t kSenderSsrc = 12345;
   MockTransport outgoing_transport;
@@ -43,6 +158,7 @@
   config.feedback_ssrc = kSenderSsrc;
   config.cname = "cname";
   config.outgoing_transport = &outgoing_transport;
+  config.schedule_periodic_compound_packets = false;
   RtcpTransceiverImpl rtcp_transceiver(config);
 
   RtcpPacketParser rtcp_parser;
@@ -78,6 +194,7 @@
   config.feedback_ssrc = kSenderSsrc;
   config.outgoing_transport = &outgoing_transport;
   config.receive_statistics = &receive_statistics;
+  config.schedule_periodic_compound_packets = false;
   RtcpTransceiverImpl rtcp_transceiver(config);
 
   rtcp_transceiver.SendCompoundPacket();