Use moving median filters in RemoteNtpTimeEstimator, RtpToNtpEstimator

If Webrtc-ClockEstimation experiment is enabled, median filtering is
applied to results of RtpToNtpEstimator and RemoteNtpEstimator to smooth
out random errors introduced by incorrect RTCP SR reports and networking
delays.

Bug: webrtc:8468
Change-Id: Iec6d094d2809d1efeb0b9483059167d9a03880da
Reviewed-on: https://webrtc-review.googlesource.com/22682
Commit-Queue: Ilya Nikolaevskiy <ilnik@webrtc.org>
Reviewed-by: Åsa Persson <asapersson@webrtc.org>
Reviewed-by: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#20682}
diff --git a/modules/rtp_rtcp/BUILD.gn b/modules/rtp_rtcp/BUILD.gn
index 9f9de96..c391d31 100644
--- a/modules/rtp_rtcp/BUILD.gn
+++ b/modules/rtp_rtcp/BUILD.gn
@@ -197,6 +197,7 @@
     "../../logging:rtc_event_log_api",
     "../../rtc_base:gtest_prod",
     "../../rtc_base:rtc_base_approved",
+    "../../rtc_base:rtc_numerics",
     "../../rtc_base:sequenced_task_checker",
     "../../system_wrappers",
     "../audio_coding:audio_format_conversion",
diff --git a/modules/rtp_rtcp/include/remote_ntp_time_estimator.h b/modules/rtp_rtcp/include/remote_ntp_time_estimator.h
index ef0ccdf..9866515 100644
--- a/modules/rtp_rtcp/include/remote_ntp_time_estimator.h
+++ b/modules/rtp_rtcp/include/remote_ntp_time_estimator.h
@@ -14,6 +14,7 @@
 #include <memory>
 
 #include "rtc_base/constructormagic.h"
+#include "rtc_base/numerics/moving_median_filter.h"
 #include "system_wrappers/include/rtp_to_ntp_estimator.h"
 
 namespace webrtc {
@@ -43,8 +44,10 @@
  private:
   Clock* clock_;
   std::unique_ptr<TimestampExtrapolator> ts_extrapolator_;
+  MovingMedianFilter<int64_t> ntp_clocks_offset_estimator_;
   RtpToNtpEstimator rtp_to_ntp_;
   int64_t last_timing_log_ms_;
+  const bool is_experiment_enabled_;
   RTC_DISALLOW_COPY_AND_ASSIGN(RemoteNtpTimeEstimator);
 };
 
diff --git a/modules/rtp_rtcp/source/remote_ntp_time_estimator.cc b/modules/rtp_rtcp/source/remote_ntp_time_estimator.cc
index 06f17a1..7b04946 100644
--- a/modules/rtp_rtcp/source/remote_ntp_time_estimator.cc
+++ b/modules/rtp_rtcp/source/remote_ntp_time_estimator.cc
@@ -12,19 +12,28 @@
 
 #include "rtc_base/logging.h"
 #include "system_wrappers/include/clock.h"
+#include "system_wrappers/include/field_trial.h"
 #include "system_wrappers/include/timestamp_extrapolator.h"
 
 namespace webrtc {
 
+namespace {
 static const int kTimingLogIntervalMs = 10000;
+static const int kClocksOffsetSmoothingWindow = 100;
+
+bool IsClockEstimationExperimentEnabled() {
+  return webrtc::field_trial::IsEnabled("WebRTC-ClockEstimation");
+}
+}  // namespace
 
 // TODO(wu): Refactor this class so that it can be shared with
 // vie_sync_module.cc.
 RemoteNtpTimeEstimator::RemoteNtpTimeEstimator(Clock* clock)
     : clock_(clock),
       ts_extrapolator_(new TimestampExtrapolator(clock_->TimeInMilliseconds())),
-      last_timing_log_ms_(-1) {
-}
+      ntp_clocks_offset_estimator_(kClocksOffsetSmoothingWindow),
+      last_timing_log_ms_(-1),
+      is_experiment_enabled_(IsClockEstimationExperimentEnabled()) {}
 
 RemoteNtpTimeEstimator::~RemoteNtpTimeEstimator() {}
 
@@ -41,12 +50,18 @@
     // No new RTCP SR since last time this function was called.
     return true;
   }
+
   // Update extrapolator with the new arrival time.
   // The extrapolator assumes the TimeInMilliseconds time.
   int64_t receiver_arrival_time_ms = clock_->TimeInMilliseconds();
   int64_t sender_send_time_ms = Clock::NtpToMs(ntp_secs, ntp_frac);
   int64_t sender_arrival_time_90k = (sender_send_time_ms + rtt / 2) * 90;
   ts_extrapolator_->Update(receiver_arrival_time_ms, sender_arrival_time_90k);
+
+  int64_t sender_arrival_time_ms = sender_send_time_ms + rtt / 2;
+  int64_t remote_to_local_clocks_offset =
+      receiver_arrival_time_ms - sender_arrival_time_ms;
+  ntp_clocks_offset_estimator_.Insert(remote_to_local_clocks_offset);
   return true;
 }
 
@@ -55,13 +70,21 @@
   if (!rtp_to_ntp_.Estimate(rtp_timestamp, &sender_capture_ntp_ms)) {
     return -1;
   }
-  uint32_t timestamp = sender_capture_ntp_ms * 90;
-  int64_t receiver_capture_ms =
-      ts_extrapolator_->ExtrapolateLocalTime(timestamp);
-  int64_t ntp_offset =
-      clock_->CurrentNtpInMilliseconds() - clock_->TimeInMilliseconds();
-  int64_t receiver_capture_ntp_ms = receiver_capture_ms + ntp_offset;
+
+  int64_t receiver_capture_ms;
+
+  if (is_experiment_enabled_) {
+    int64_t remote_to_local_clocks_offset =
+        ntp_clocks_offset_estimator_.GetFilteredValue();
+    receiver_capture_ms = sender_capture_ntp_ms + remote_to_local_clocks_offset;
+  } else {
+    uint32_t timestamp = sender_capture_ntp_ms * 90;
+    receiver_capture_ms = ts_extrapolator_->ExtrapolateLocalTime(timestamp);
+  }
   int64_t now_ms = clock_->TimeInMilliseconds();
+  int64_t ntp_offset = clock_->CurrentNtpInMilliseconds() - now_ms;
+  int64_t receiver_capture_ntp_ms = receiver_capture_ms + ntp_offset;
+
   if (now_ms - last_timing_log_ms_ > kTimingLogIntervalMs) {
     RTC_LOG(LS_INFO) << "RTP timestamp: " << rtp_timestamp
                      << " in NTP clock: " << sender_capture_ntp_ms
diff --git a/modules/rtp_rtcp/source/remote_ntp_time_estimator_unittest.cc b/modules/rtp_rtcp/source/remote_ntp_time_estimator_unittest.cc
index 9812f26..410085e 100644
--- a/modules/rtp_rtcp/source/remote_ntp_time_estimator_unittest.cc
+++ b/modules/rtp_rtcp/source/remote_ntp_time_estimator_unittest.cc
@@ -8,9 +8,10 @@
 *  be found in the AUTHORS file in the root of the source tree.
 */
 
-#include "common_types.h"  // NOLINT(build/include)
 #include "modules/rtp_rtcp/include/remote_ntp_time_estimator.h"
+#include "common_types.h"  // NOLINT(build/include)
 #include "system_wrappers/include/clock.h"
+#include "test/field_trial.h"
 #include "test/gmock.h"
 #include "test/gtest.h"
 
@@ -31,7 +32,7 @@
   RemoteNtpTimeEstimatorTest()
       : local_clock_(kLocalClockInitialTimeMs * 1000),
         remote_clock_(kRemoteClockInitialTimeMs * 1000),
-        estimator_(&local_clock_) {}
+        estimator_(new RemoteNtpTimeEstimator(&local_clock_)) {}
   ~RemoteNtpTimeEstimatorTest() {}
 
   void AdvanceTimeMilliseconds(int64_t ms) {
@@ -52,11 +53,21 @@
     ReceiveRtcpSr(kTestRtt, rtcp_timestamp, ntp.seconds(), ntp.fractions());
   }
 
+  void SendRtcpSrInaccurately(int64_t ntp_error_ms,
+                              int64_t networking_delay_ms) {
+    uint32_t rtcp_timestamp = GetRemoteTimestamp();
+    int64_t ntp_error_fractions =
+        ntp_error_ms * NtpTime::kFractionsPerSecond / 1000;
+    NtpTime ntp(static_cast<uint64_t>(remote_clock_.CurrentNtpTime()) +
+                ntp_error_fractions);
+    AdvanceTimeMilliseconds(kTestRtt / 2 + networking_delay_ms);
+    ReceiveRtcpSr(kTestRtt, rtcp_timestamp, ntp.seconds(), ntp.fractions());
+  }
+
   void UpdateRtcpTimestamp(int64_t rtt, uint32_t ntp_secs, uint32_t ntp_frac,
                            uint32_t rtp_timestamp, bool expected_result) {
-    EXPECT_EQ(expected_result,
-              estimator_.UpdateRtcpTimestamp(rtt, ntp_secs, ntp_frac,
-                                             rtp_timestamp));
+    EXPECT_EQ(expected_result, estimator_->UpdateRtcpTimestamp(
+                                   rtt, ntp_secs, ntp_frac, rtp_timestamp));
   }
 
   void ReceiveRtcpSr(int64_t rtt,
@@ -68,7 +79,7 @@
 
   SimulatedClock local_clock_;
   SimulatedClock remote_clock_;
-  RemoteNtpTimeEstimator estimator_;
+  std::unique_ptr<RemoteNtpTimeEstimator> estimator_;
 };
 
 TEST_F(RemoteNtpTimeEstimatorTest, Estimate) {
@@ -86,14 +97,54 @@
 
   // Local peer needs at least 2 RTCP SR to calculate the capture time.
   const int64_t kNotEnoughRtcpSr = -1;
-  EXPECT_EQ(kNotEnoughRtcpSr, estimator_.Estimate(rtp_timestamp));
+  EXPECT_EQ(kNotEnoughRtcpSr, estimator_->Estimate(rtp_timestamp));
 
   AdvanceTimeMilliseconds(800);
   // Remote sends second RTCP SR.
   SendRtcpSr();
 
   // Local peer gets enough RTCP SR to calculate the capture time.
-  EXPECT_EQ(capture_ntp_time_ms, estimator_.Estimate(rtp_timestamp));
+  EXPECT_EQ(capture_ntp_time_ms, estimator_->Estimate(rtp_timestamp));
+}
+
+TEST_F(RemoteNtpTimeEstimatorTest, AveragesErrorsOut) {
+  test::ScopedFieldTrials override_field_trials(
+      "WebRTC-ClockEstimation/Enabled/");
+  // Reset estimator_ because it checks experiment status during construction.
+  estimator_.reset(new RemoteNtpTimeEstimator(&local_clock_));
+
+  // Remote peer sends first 5 RTCP SR without errors.
+  AdvanceTimeMilliseconds(1000);
+  SendRtcpSr();
+  AdvanceTimeMilliseconds(1000);
+  SendRtcpSr();
+  AdvanceTimeMilliseconds(1000);
+  SendRtcpSr();
+  AdvanceTimeMilliseconds(1000);
+  SendRtcpSr();
+  AdvanceTimeMilliseconds(1000);
+  SendRtcpSr();
+
+  AdvanceTimeMilliseconds(15);
+  uint32_t rtp_timestamp = GetRemoteTimestamp();
+  int64_t capture_ntp_time_ms = local_clock_.CurrentNtpInMilliseconds();
+
+  // Local peer gets enough RTCP SR to calculate the capture time.
+  EXPECT_EQ(capture_ntp_time_ms, estimator_->Estimate(rtp_timestamp));
+
+  // Remote sends corrupted RTCP SRs
+  AdvanceTimeMilliseconds(1000);
+  SendRtcpSrInaccurately(10, 10);
+  AdvanceTimeMilliseconds(1000);
+  SendRtcpSrInaccurately(-20, 5);
+
+  // New RTP packet to estimate timestamp.
+  AdvanceTimeMilliseconds(150);
+  rtp_timestamp = GetRemoteTimestamp();
+  capture_ntp_time_ms = local_clock_.CurrentNtpInMilliseconds();
+
+  // Errors should be averaged out.
+  EXPECT_EQ(capture_ntp_time_ms, estimator_->Estimate(rtp_timestamp));
 }
 
 }  // namespace webrtc
diff --git a/system_wrappers/BUILD.gn b/system_wrappers/BUILD.gn
index fb376fe..ca11b89 100644
--- a/system_wrappers/BUILD.gn
+++ b/system_wrappers/BUILD.gn
@@ -101,7 +101,10 @@
     suppressed_configs += [ "//build/config/clang:find_bad_constructs" ]
   }
 
-  deps += [ "../rtc_base:rtc_base_approved" ]
+  deps += [
+    "../rtc_base:rtc_base_approved",
+    "../rtc_base:rtc_numerics",
+  ]
 }
 
 rtc_source_set("cpu_features_api") {
diff --git a/system_wrappers/include/rtp_to_ntp_estimator.h b/system_wrappers/include/rtp_to_ntp_estimator.h
index 6aad545..3e85807 100644
--- a/system_wrappers/include/rtp_to_ntp_estimator.h
+++ b/system_wrappers/include/rtp_to_ntp_estimator.h
@@ -15,6 +15,7 @@
 
 #include "api/optional.h"
 #include "modules/include/module_common_types_public.h"
+#include "rtc_base/numerics/moving_median_filter.h"
 #include "system_wrappers/include/ntp_time.h"
 #include "typedefs.h"  // NOLINT(build/include)
 
@@ -40,8 +41,23 @@
 
   // Estimated parameters from RTP and NTP timestamp pairs in |measurements_|.
   struct Parameters {
+    // Implicit conversion from int because MovingMedianFilter returns 0
+    // internally if no samples are present. However, it should never happen as
+    // we don't ask smoothing_filter_ to return anything if there were no
+    // samples.
+    Parameters(const int& value) {  // NOLINT
+      RTC_NOTREACHED();
+    }
+    Parameters() : frequency_khz(0.0), offset_ms(0.0) {}
+
     double frequency_khz;
     double offset_ms;
+
+    // Needed to make it work inside MovingMedianFilter
+    bool operator<(const Parameters& other) const;
+    bool operator==(const Parameters& other) const;
+    bool operator<=(const Parameters& other) const;
+    bool operator!=(const Parameters& other) const;
   };
 
   // Updates measurements with RTP/NTP timestamp pair from a RTCP sender report.
@@ -55,13 +71,8 @@
   // Returns true on success, false otherwise.
   bool Estimate(int64_t rtp_timestamp, int64_t* rtp_timestamp_ms) const;
 
-  const rtc::Optional<Parameters> params() const {
-    rtc::Optional<Parameters> res;
-    if (params_calculated_) {
-      res.emplace(params_);
-    }
-    return res;
-  }
+  // Returns estimated rtp to ntp linear transform parameters.
+  const rtc::Optional<Parameters> params() const;
 
   static const int kMaxInvalidSamples = 3;
 
@@ -71,8 +82,10 @@
   int consecutive_invalid_samples_;
   std::list<RtcpMeasurement> measurements_;
   Parameters params_;
+  MovingMedianFilter<Parameters> smoothing_filter_;
   bool params_calculated_;
   mutable TimestampUnwrapper unwrapper_;
+  const bool is_experiment_enabled_;
 };
 }  // namespace webrtc
 
diff --git a/system_wrappers/source/rtp_to_ntp_estimator.cc b/system_wrappers/source/rtp_to_ntp_estimator.cc
index cd33f32..55cdfab 100644
--- a/system_wrappers/source/rtp_to_ntp_estimator.cc
+++ b/system_wrappers/source/rtp_to_ntp_estimator.cc
@@ -13,11 +13,18 @@
 #include "rtc_base/checks.h"
 #include "rtc_base/logging.h"
 #include "system_wrappers/include/clock.h"
+#include "system_wrappers/include/field_trial.h"
 
 namespace webrtc {
 namespace {
 // Number of RTCP SR reports to use to map between RTP and NTP.
 const size_t kNumRtcpReportsToUse = 2;
+// Number of parameters samples used to smooth.
+const size_t kNumSamplesToSmooth = 20;
+
+bool IsClockEstimationExperimentEnabled() {
+  return webrtc::field_trial::IsEnabled("WebRTC-ClockEstimation");
+}
 
 // Calculates the RTP timestamp frequency from two pairs of NTP/RTP timestamps.
 bool CalculateFrequency(int64_t ntp_ms1,
@@ -43,6 +50,28 @@
 }
 }  // namespace
 
+bool RtpToNtpEstimator::Parameters::operator<(const Parameters& other) const {
+  if (frequency_khz < other.frequency_khz - 1e-6) {
+    return true;
+  } else if (frequency_khz > other.frequency_khz + 1e-6) {
+    return false;
+  } else {
+    return offset_ms < other.offset_ms - 1e-6;
+  }
+}
+
+bool RtpToNtpEstimator::Parameters::operator==(const Parameters& other) const {
+  return !(other < *this || *this < other);
+}
+
+bool RtpToNtpEstimator::Parameters::operator!=(const Parameters& other) const {
+  return other < *this || *this < other;
+}
+
+bool RtpToNtpEstimator::Parameters::operator<=(const Parameters& other) const {
+  return !(other < *this);
+}
+
 RtpToNtpEstimator::RtcpMeasurement::RtcpMeasurement(uint32_t ntp_secs,
                                                     uint32_t ntp_frac,
                                                     int64_t unwrapped_timestamp)
@@ -59,13 +88,18 @@
 
 // Class for converting an RTP timestamp to the NTP domain.
 RtpToNtpEstimator::RtpToNtpEstimator()
-    : consecutive_invalid_samples_(0), params_calculated_(false) {}
+    : consecutive_invalid_samples_(0),
+      smoothing_filter_(kNumSamplesToSmooth),
+      params_calculated_(false),
+      is_experiment_enabled_(IsClockEstimationExperimentEnabled()) {}
+
 RtpToNtpEstimator::~RtpToNtpEstimator() {}
 
 void RtpToNtpEstimator::UpdateParameters() {
   if (measurements_.size() != kNumRtcpReportsToUse)
     return;
 
+  Parameters params;
   int64_t timestamp_new = measurements_.front().unwrapped_rtp_timestamp;
   int64_t timestamp_old = measurements_.back().unwrapped_rtp_timestamp;
 
@@ -73,11 +107,16 @@
   int64_t ntp_ms_old = measurements_.back().ntp_time.ToMs();
 
   if (!CalculateFrequency(ntp_ms_new, timestamp_new, ntp_ms_old, timestamp_old,
-                          &params_.frequency_khz)) {
+                          &params.frequency_khz)) {
     return;
   }
-  params_.offset_ms = timestamp_new - params_.frequency_khz * ntp_ms_new;
+  params.offset_ms = timestamp_new - params.frequency_khz * ntp_ms_new;
   params_calculated_ = true;
+  if (is_experiment_enabled_) {
+    smoothing_filter_.Insert(params);
+  } else {
+    params_ = params;
+  }
 }
 
 bool RtpToNtpEstimator::UpdateMeasurements(uint32_t ntp_secs,
@@ -94,6 +133,7 @@
     // RTCP SR report already added.
     return true;
   }
+
   if (!new_measurement.ntp_time.Valid())
     return false;
 
@@ -122,6 +162,7 @@
     RTC_LOG(LS_WARNING) << "Multiple consecutively invalid RTCP SR reports, "
                            "clearing measurements.";
     measurements_.clear();
+    smoothing_filter_.Reset();
     params_calculated_ = false;
   }
   consecutive_invalid_samples_ = 0;
@@ -145,12 +186,15 @@
 
   int64_t rtp_timestamp_unwrapped = unwrapper_.Unwrap(rtp_timestamp);
 
-  // params_calculated_ should not be true unless ms params_.frequency_khz has
+  Parameters params =
+      is_experiment_enabled_ ? smoothing_filter_.GetFilteredValue() : params_;
+
+  // params_calculated_ should not be true unless ms params.frequency_khz has
   // been calculated to something non zero.
-  RTC_DCHECK_NE(params_.frequency_khz, 0.0);
+  RTC_DCHECK_NE(params.frequency_khz, 0.0);
   double rtp_ms =
-      (static_cast<double>(rtp_timestamp_unwrapped) - params_.offset_ms) /
-          params_.frequency_khz +
+      (static_cast<double>(rtp_timestamp_unwrapped) - params.offset_ms) /
+          params.frequency_khz +
       0.5f;
 
   if (rtp_ms < 0)
@@ -159,4 +203,14 @@
   *rtp_timestamp_ms = rtp_ms;
   return true;
 }
+
+const rtc::Optional<RtpToNtpEstimator::Parameters> RtpToNtpEstimator::params()
+    const {
+  rtc::Optional<Parameters> res;
+  if (params_calculated_) {
+    res.emplace(is_experiment_enabled_ ? smoothing_filter_.GetFilteredValue()
+                                       : params_);
+  }
+  return res;
+}
 }  // namespace webrtc