Call OnDecodedFrame asynchronously on the worker thread.

This offloads the decoder thread with managing histograms,
moves the management over to the thread on which they're queried.
This will allow us to remove more locking from the decoder threads
and avoid contention when querying for stats.

Bug: webrtc:11489
Change-Id: I563c90a0ed01e0b3598ee314d8118622216a2e0f
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/174201
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Rasmus Brandt <brandtr@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31201}
diff --git a/video/receive_statistics_proxy2.cc b/video/receive_statistics_proxy2.cc
index c16dd8a..cafea6c 100644
--- a/video/receive_statistics_proxy2.cc
+++ b/video/receive_statistics_proxy2.cc
@@ -137,21 +137,7 @@
     absl::optional<int> fraction_lost,
     const StreamDataCounters& rtp_stats,
     const StreamDataCounters* rtx_stats) {
-  {
-    // TODO(bugs.webrtc.org/11489): Delete this scope after refactoring.
-    // We're actually on the main thread here, below is the explanation for
-    // why we use another thread checker. Once refactored, we can clean this
-    // up and not use the decode_queue_ checker here.
-    RTC_DCHECK_RUN_ON(&main_thread_);
-  }
-
-  // We're not actually running on the decoder thread, but must be called after
-  // DecoderThreadStopped, which detaches the thread checker. It is therefore
-  // safe to access |qp_counters_|, which were updated on the decode thread
-  // earlier.
-  RTC_DCHECK_RUN_ON(&decode_queue_);
-
-  rtc::CritScope lock(&crit_);
+  RTC_DCHECK_RUN_ON(&main_thread_);
 
   // TODO(bugs.webrtc.org/11489): Many of these variables don't need to be
   // inside the scope of a lock. Also consider grabbing the lock only to copy
@@ -162,6 +148,8 @@
 
   int stream_duration_sec = (clock_->TimeInMilliseconds() - start_ms_) / 1000;
 
+  rtc::CritScope lock(&crit_);
+
   if (stats_.frame_counts.key_frames > 0 ||
       stats_.frame_counts.delta_frames > 0) {
     RTC_HISTOGRAM_COUNTS_100000("WebRTC.Video.ReceiveStreamLifetimeInSeconds",
@@ -253,10 +241,17 @@
                << key_frames_permille << '\n';
   }
 
-  absl::optional<int> qp = qp_counters_.vp8.Avg(kMinRequiredSamples);
-  if (qp) {
-    RTC_HISTOGRAM_COUNTS_200("WebRTC.Video.Decoded.Vp8.Qp", *qp);
-    log_stream << "WebRTC.Video.Decoded.Vp8.Qp " << *qp << '\n';
+  {
+    // We're not actually running on the decoder thread, but this function must
+    // be called after DecoderThreadStopped, which detaches the thread checker.
+    // It is therefore safe to access |qp_counters_|, which were updated on the
+    // decode thread earlier.
+    RTC_DCHECK_RUN_ON(&decode_queue_);
+    absl::optional<int> qp = qp_counters_.vp8.Avg(kMinRequiredSamples);
+    if (qp) {
+      RTC_HISTOGRAM_COUNTS_200("WebRTC.Video.Decoded.Vp8.Qp", *qp);
+      log_stream << "WebRTC.Video.Decoded.Vp8.Qp " << *qp << '\n';
+    }
   }
   absl::optional<int> decode_ms = decode_time_counter_.Avg(kMinRequiredSamples);
   if (decode_ms) {
@@ -573,8 +568,8 @@
 }
 
 void ReceiveStatisticsProxy::UpdateFramerate(int64_t now_ms) const {
-  // TODO(bugs.webrtc.org/11489): Currently seems to be called from two threads,
-  // main and decode. Consider moving both to main.
+  RTC_DCHECK_RUN_ON(&main_thread_);
+
   int64_t old_frames_ms = now_ms - kRateStatisticsWindowSizeMs;
   while (!frame_window_.empty() &&
          frame_window_.begin()->first < old_frames_ms) {
@@ -583,6 +578,8 @@
 
   size_t framerate =
       (frame_window_.size() * 1000 + 500) / kRateStatisticsWindowSizeMs;
+
+  rtc::CritScope lock(&crit_);
   stats_.network_frame_rate = static_cast<int>(framerate);
 }
 
@@ -590,8 +587,7 @@
     int width,
     int height,
     int decode_time_ms) const {
-  // TODO(bugs.webrtc.org/11489): Consider posting the work to the worker
-  // thread.
+  RTC_DCHECK_RUN_ON(&main_thread_);
 
   bool is_4k = (width == 3840 || width == 4096) && height == 2160;
   bool is_hd = width == 1920 && height == 1080;
@@ -663,15 +659,28 @@
   // StatsCollector::ExtractMediaInfo via worker_thread()->Invoke().
   // WebRtcVideoChannel::GetStats(), GetVideoReceiverInfo.
 
-  rtc::CritScope lock(&crit_);
   // Get current frame rates here, as only updating them on new frames prevents
   // us from ever correctly displaying frame rate of 0.
   int64_t now_ms = clock_->TimeInMilliseconds();
   UpdateFramerate(now_ms);
+
+  rtc::CritScope lock(&crit_);
   stats_.render_frame_rate = renders_fps_estimator_.Rate(now_ms).value_or(0);
   stats_.decode_frame_rate = decode_fps_estimator_.Rate(now_ms).value_or(0);
-  stats_.interframe_delay_max_ms =
-      interframe_delay_max_moving_.Max(now_ms).value_or(-1);
+
+  if (last_decoded_frame_time_ms_) {
+    // Avoid using a newer timestamp than might be pending for decoded frames.
+    // If we do use now_ms, we might roll the max window to a value that is
+    // higher than that of a decoded frame timestamp that we haven't yet
+    // captured the data for (i.e. pending call to OnDecodedFrame).
+    stats_.interframe_delay_max_ms =
+        interframe_delay_max_moving_.Max(*last_decoded_frame_time_ms_)
+            .value_or(-1);
+  } else {
+    // We're paused. Avoid changing the state of |interframe_delay_max_moving_|.
+    stats_.interframe_delay_max_ms = -1;
+  }
+
   stats_.freeze_count = video_quality_observer_->NumFreezes();
   stats_.pause_count = video_quality_observer_->NumPauses();
   stats_.total_freezes_duration_ms =
@@ -803,15 +812,24 @@
                                             absl::optional<uint8_t> qp,
                                             int32_t decode_time_ms,
                                             VideoContentType content_type) {
-  // TODO(bugs.webrtc.org/11489): On iOS this gets called on
+  // See VCMDecodedFrameCallback::Decoded for more info on what thread/queue we
+  // may be on. E.g. on iOS this gets called on
   // "com.apple.coremedia.decompressionsession.clientcallback"
-  // See VCMDecodedFrameCallback::Decoded for info on what thread/queue we may
-  // be on.
-  // RTC_DCHECK_RUN_ON(&decode_queue_);
+  VideoFrameMetaData meta(frame, clock_->CurrentTime());
+  worker_thread_->PostTask(ToQueuedTask([safety = task_safety_flag_, meta, qp,
+                                         decode_time_ms, content_type, this]() {
+    if (!safety->alive())
+      return;
+    OnDecodedFrame(meta, qp, decode_time_ms, content_type);
+  }));
+}
 
-  rtc::CritScope lock(&crit_);
-
-  const uint64_t now_ms = clock_->TimeInMilliseconds();
+void ReceiveStatisticsProxy::OnDecodedFrame(
+    const VideoFrameMetaData& frame_meta,
+    absl::optional<uint8_t> qp,
+    int32_t decode_time_ms,
+    VideoContentType content_type) {
+  RTC_DCHECK_RUN_ON(&main_thread_);
 
   const bool is_screenshare =
       videocontenttypehelpers::IsScreenshare(content_type);
@@ -825,7 +843,8 @@
     video_quality_observer_.reset(new VideoQualityObserver());
   }
 
-  video_quality_observer_->OnDecodedFrame(frame.timestamp(), qp,
+  rtc::CritScope lock(&crit_);
+  video_quality_observer_->OnDecodedFrame(frame_meta.rtp_timestamp, qp,
                                           last_codec_type_);
 
   ContentSpecificStats* content_specific_stats =
@@ -850,28 +869,32 @@
   stats_.decode_ms = decode_time_ms;
   stats_.total_decode_time_ms += decode_time_ms;
   if (enable_decode_time_histograms_) {
-    UpdateDecodeTimeHistograms(frame.width(), frame.height(), decode_time_ms);
+    UpdateDecodeTimeHistograms(frame_meta.width, frame_meta.height,
+                               decode_time_ms);
   }
 
   last_content_type_ = content_type;
-  decode_fps_estimator_.Update(1, now_ms);
+  decode_fps_estimator_.Update(1, frame_meta.decode_timestamp.ms());
+
   if (last_decoded_frame_time_ms_) {
-    int64_t interframe_delay_ms = now_ms - *last_decoded_frame_time_ms_;
+    int64_t interframe_delay_ms =
+        frame_meta.decode_timestamp.ms() - *last_decoded_frame_time_ms_;
     RTC_DCHECK_GE(interframe_delay_ms, 0);
     double interframe_delay = interframe_delay_ms / 1000.0;
     stats_.total_inter_frame_delay += interframe_delay;
     stats_.total_squared_inter_frame_delay +=
         interframe_delay * interframe_delay;
-    interframe_delay_max_moving_.Add(interframe_delay_ms, now_ms);
+    interframe_delay_max_moving_.Add(interframe_delay_ms,
+                                     frame_meta.decode_timestamp.ms());
     content_specific_stats->interframe_delay_counter.Add(interframe_delay_ms);
     content_specific_stats->interframe_delay_percentiles.Add(
         interframe_delay_ms);
     content_specific_stats->flow_duration_ms += interframe_delay_ms;
   }
   if (stats_.frames_decoded == 1) {
-    first_decoded_frame_time_ms_.emplace(now_ms);
+    first_decoded_frame_time_ms_.emplace(frame_meta.decode_timestamp.ms());
   }
-  last_decoded_frame_time_ms_.emplace(now_ms);
+  last_decoded_frame_time_ms_.emplace(frame_meta.decode_timestamp.ms());
 }
 
 void ReceiveStatisticsProxy::OnRenderedFrame(
@@ -940,6 +963,7 @@
 void ReceiveStatisticsProxy::OnCompleteFrame(bool is_keyframe,
                                              size_t size_bytes,
                                              VideoContentType content_type) {
+  RTC_DCHECK_RUN_ON(&main_thread_);
   rtc::CritScope lock(&crit_);
   if (is_keyframe) {
     ++stats_.frame_counts.key_frames;
@@ -985,13 +1009,13 @@
 }
 
 void ReceiveStatisticsProxy::OnStreamInactive() {
-  RTC_DCHECK_RUN_ON(&decode_queue_);
+  RTC_DCHECK_RUN_ON(&main_thread_);
 
   // TODO(sprang): Figure out any other state that should be reset.
 
-  rtc::CritScope lock(&crit_);
   // Don't report inter-frame delay if stream was paused.
   last_decoded_frame_time_ms_.reset();
+
   video_quality_observer_->OnStreamInactive();
 }
 
diff --git a/video/receive_statistics_proxy2.h b/video/receive_statistics_proxy2.h
index a8b3824..6e68266 100644
--- a/video/receive_statistics_proxy2.h
+++ b/video/receive_statistics_proxy2.h
@@ -61,6 +61,15 @@
                       absl::optional<uint8_t> qp,
                       int32_t decode_time_ms,
                       VideoContentType content_type);
+
+  // Called asyncronously on the worker thread as a result of a call to the
+  // above OnDecodedFrame method, which is called back on the thread where
+  // the actual decoding happens.
+  void OnDecodedFrame(const VideoFrameMetaData& frame_meta,
+                      absl::optional<uint8_t> qp,
+                      int32_t decode_time_ms,
+                      VideoContentType content_type);
+
   void OnSyncOffsetUpdated(int64_t video_playout_ntp_ms,
                            int64_t sync_offset_ms,
                            double estimated_freq_khz);
@@ -136,8 +145,7 @@
   void QualitySample(Timestamp now) RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
 
   // Removes info about old frames and then updates the framerate.
-  void UpdateFramerate(int64_t now_ms) const
-      RTC_EXCLUSIVE_LOCKS_REQUIRED(crit_);
+  void UpdateFramerate(int64_t now_ms) const;
 
   void UpdateDecodeTimeHistograms(int width,
                                   int height,
@@ -152,8 +160,9 @@
   const bool enable_decode_time_histograms_;
 
   rtc::CriticalSection crit_;
-  int64_t last_sample_time_ RTC_GUARDED_BY(crit_);
-  QualityThreshold fps_threshold_ RTC_GUARDED_BY(crit_);
+  int64_t last_sample_time_ RTC_GUARDED_BY(main_thread_);
+
+  QualityThreshold fps_threshold_ RTC_GUARDED_BY(main_thread_);
   QualityThreshold qp_threshold_ RTC_GUARDED_BY(crit_);
   QualityThreshold variance_threshold_ RTC_GUARDED_BY(crit_);
   rtc::SampleCounter qp_sample_ RTC_GUARDED_BY(crit_);
@@ -174,20 +183,21 @@
   rtc::SampleCounter current_delay_counter_ RTC_GUARDED_BY(crit_);
   rtc::SampleCounter delay_counter_ RTC_GUARDED_BY(crit_);
   std::unique_ptr<VideoQualityObserver> video_quality_observer_
-      RTC_GUARDED_BY(crit_);
+      RTC_GUARDED_BY(main_thread_);
   mutable rtc::MovingMaxCounter<int> interframe_delay_max_moving_
-      RTC_GUARDED_BY(crit_);
+      RTC_GUARDED_BY(main_thread_);
   std::map<VideoContentType, ContentSpecificStats> content_specific_stats_
       RTC_GUARDED_BY(crit_);
   MaxCounter freq_offset_counter_ RTC_GUARDED_BY(crit_);
   QpCounters qp_counters_ RTC_GUARDED_BY(decode_queue_);
   int64_t avg_rtt_ms_ RTC_GUARDED_BY(crit_);
-  mutable std::map<int64_t, size_t> frame_window_ RTC_GUARDED_BY(&crit_);
-  VideoContentType last_content_type_ RTC_GUARDED_BY(&crit_);
+  mutable std::map<int64_t, size_t> frame_window_ RTC_GUARDED_BY(main_thread_);
+  VideoContentType last_content_type_ RTC_GUARDED_BY(&main_thread_);
   VideoCodecType last_codec_type_ RTC_GUARDED_BY(&crit_);
   absl::optional<int64_t> first_frame_received_time_ms_ RTC_GUARDED_BY(&crit_);
   absl::optional<int64_t> first_decoded_frame_time_ms_ RTC_GUARDED_BY(&crit_);
-  absl::optional<int64_t> last_decoded_frame_time_ms_ RTC_GUARDED_BY(&crit_);
+  absl::optional<int64_t> last_decoded_frame_time_ms_
+      RTC_GUARDED_BY(main_thread_);
   size_t num_delayed_frames_rendered_ RTC_GUARDED_BY(&crit_);
   int64_t sum_missed_render_deadline_ms_ RTC_GUARDED_BY(&crit_);
   // Mutable because calling Max() on MovingMaxCounter is not const. Yet it is
diff --git a/video/receive_statistics_proxy2_unittest.cc b/video/receive_statistics_proxy2_unittest.cc
index 7ad71dc..90c6a32 100644
--- a/video/receive_statistics_proxy2_unittest.cc
+++ b/video/receive_statistics_proxy2_unittest.cc
@@ -53,6 +53,19 @@
   ~ReceiveStatisticsProxy2Test() override { statistics_proxy_.reset(); }
 
  protected:
+  // Convenience method to avoid too many explict flushes.
+  VideoReceiveStream::Stats FlushAndGetStats() {
+    loop_.Flush();
+    return statistics_proxy_->GetStats();
+  }
+
+  void FlushAndUpdateHistograms(absl::optional<int> fraction_lost,
+                                const StreamDataCounters& rtp_stats,
+                                const StreamDataCounters* rtx_stats) {
+    loop_.Flush();
+    statistics_proxy_->UpdateHistograms(fraction_lost, rtp_stats, rtx_stats);
+  }
+
   VideoReceiveStream::Config GetTestConfig() {
     VideoReceiveStream::Config config(nullptr);
     config.rtp.local_ssrc = kLocalSsrc;
@@ -109,7 +122,7 @@
   for (uint32_t i = 1; i <= 3; ++i) {
     statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                       VideoContentType::UNSPECIFIED);
-    EXPECT_EQ(i, statistics_proxy_->GetStats().frames_decoded);
+    EXPECT_EQ(i, FlushAndGetStats().frames_decoded);
   }
 }
 
@@ -122,8 +135,7 @@
                                       VideoContentType::UNSPECIFIED);
     fake_clock_.AdvanceTimeMilliseconds(1000 / kFps);
   }
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   EXPECT_METRIC_EQ(1,
                    metrics::NumSamples("WebRTC.Video.DecodedFramesPerSecond"));
   EXPECT_METRIC_EQ(
@@ -139,8 +151,7 @@
                                       VideoContentType::UNSPECIFIED);
     fake_clock_.AdvanceTimeMilliseconds(1000 / kFps);
   }
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   EXPECT_METRIC_EQ(0,
                    metrics::NumSamples("WebRTC.Video.DecodedFramesPerSecond"));
 }
@@ -156,6 +167,7 @@
                                       VideoContentType::UNSPECIFIED);
     expected_total_decode_time_ms += 1;
     ++expected_frames_decoded;
+    loop_.Flush();
     EXPECT_EQ(expected_frames_decoded,
               statistics_proxy_->GetStats().frames_decoded);
     EXPECT_EQ(expected_total_decode_time_ms,
@@ -165,6 +177,7 @@
                                     VideoContentType::UNSPECIFIED);
   ++expected_frames_decoded;
   expected_total_decode_time_ms += 3;
+  loop_.Flush();
   EXPECT_EQ(expected_frames_decoded,
             statistics_proxy_->GetStats().frames_decoded);
   EXPECT_EQ(expected_total_decode_time_ms,
@@ -176,10 +189,10 @@
   webrtc::VideoFrame frame = CreateFrame(kWidth, kHeight);
   statistics_proxy_->OnDecodedFrame(frame, 3u, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(3u, statistics_proxy_->GetStats().qp_sum);
+  EXPECT_EQ(3u, FlushAndGetStats().qp_sum);
   statistics_proxy_->OnDecodedFrame(frame, 127u, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(130u, statistics_proxy_->GetStats().qp_sum);
+  EXPECT_EQ(130u, FlushAndGetStats().qp_sum);
 }
 
 TEST_F(ReceiveStatisticsProxy2Test, OnDecodedFrameIncreasesTotalDecodeTime) {
@@ -187,10 +200,10 @@
   webrtc::VideoFrame frame = CreateFrame(kWidth, kHeight);
   statistics_proxy_->OnDecodedFrame(frame, 3u, 4,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(4u, statistics_proxy_->GetStats().total_decode_time_ms);
+  EXPECT_EQ(4u, FlushAndGetStats().total_decode_time_ms);
   statistics_proxy_->OnDecodedFrame(frame, 127u, 7,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(11u, statistics_proxy_->GetStats().total_decode_time_ms);
+  EXPECT_EQ(11u, FlushAndGetStats().total_decode_time_ms);
 }
 
 TEST_F(ReceiveStatisticsProxy2Test, ReportsContentType) {
@@ -202,12 +215,11 @@
   statistics_proxy_->OnDecodedFrame(frame, 3u, 0,
                                     VideoContentType::SCREENSHARE);
   EXPECT_EQ(kScreenshareString,
-            videocontenttypehelpers::ToString(
-                statistics_proxy_->GetStats().content_type));
+            videocontenttypehelpers::ToString(FlushAndGetStats().content_type));
   statistics_proxy_->OnDecodedFrame(frame, 3u, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(kRealtimeString, videocontenttypehelpers::ToString(
-                                 statistics_proxy_->GetStats().content_type));
+  EXPECT_EQ(kRealtimeString,
+            videocontenttypehelpers::ToString(FlushAndGetStats().content_type));
 }
 
 TEST_F(ReceiveStatisticsProxy2Test, ReportsMaxTotalInterFrameDelay) {
@@ -225,10 +237,9 @@
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
   EXPECT_DOUBLE_EQ(expected_total_inter_frame_delay,
-                   statistics_proxy_->GetStats().total_inter_frame_delay);
-  EXPECT_DOUBLE_EQ(
-      expected_total_squared_inter_frame_delay,
-      statistics_proxy_->GetStats().total_squared_inter_frame_delay);
+                   FlushAndGetStats().total_inter_frame_delay);
+  EXPECT_DOUBLE_EQ(expected_total_squared_inter_frame_delay,
+                   FlushAndGetStats().total_squared_inter_frame_delay);
 
   fake_clock_.AdvanceTime(kInterFrameDelay1);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
@@ -237,7 +248,7 @@
   expected_total_squared_inter_frame_delay +=
       pow(kInterFrameDelay1.seconds<double>(), 2.0);
   EXPECT_DOUBLE_EQ(expected_total_inter_frame_delay,
-                   statistics_proxy_->GetStats().total_inter_frame_delay);
+                   FlushAndGetStats().total_inter_frame_delay);
   EXPECT_DOUBLE_EQ(
       expected_total_squared_inter_frame_delay,
       statistics_proxy_->GetStats().total_squared_inter_frame_delay);
@@ -249,7 +260,7 @@
   expected_total_squared_inter_frame_delay +=
       pow(kInterFrameDelay2.seconds<double>(), 2.0);
   EXPECT_DOUBLE_EQ(expected_total_inter_frame_delay,
-                   statistics_proxy_->GetStats().total_inter_frame_delay);
+                   FlushAndGetStats().total_inter_frame_delay);
   EXPECT_DOUBLE_EQ(
       expected_total_squared_inter_frame_delay,
       statistics_proxy_->GetStats().total_squared_inter_frame_delay);
@@ -261,7 +272,7 @@
   expected_total_squared_inter_frame_delay +=
       pow(kInterFrameDelay3.seconds<double>(), 2.0);
   EXPECT_DOUBLE_EQ(expected_total_inter_frame_delay,
-                   statistics_proxy_->GetStats().total_inter_frame_delay);
+                   FlushAndGetStats().total_inter_frame_delay);
   EXPECT_DOUBLE_EQ(
       expected_total_squared_inter_frame_delay,
       statistics_proxy_->GetStats().total_squared_inter_frame_delay);
@@ -275,26 +286,23 @@
   EXPECT_EQ(-1, statistics_proxy_->GetStats().interframe_delay_max_ms);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(-1, statistics_proxy_->GetStats().interframe_delay_max_ms);
+  EXPECT_EQ(-1, FlushAndGetStats().interframe_delay_max_ms);
 
   fake_clock_.AdvanceTimeMilliseconds(kInterframeDelayMs1);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(kInterframeDelayMs1,
-            statistics_proxy_->GetStats().interframe_delay_max_ms);
+  EXPECT_EQ(kInterframeDelayMs1, FlushAndGetStats().interframe_delay_max_ms);
 
   fake_clock_.AdvanceTimeMilliseconds(kInterframeDelayMs2);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(kInterframeDelayMs2,
-            statistics_proxy_->GetStats().interframe_delay_max_ms);
+  EXPECT_EQ(kInterframeDelayMs2, FlushAndGetStats().interframe_delay_max_ms);
 
   fake_clock_.AdvanceTimeMilliseconds(kInterframeDelayMs3);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
   // kInterframeDelayMs3 is smaller than kInterframeDelayMs2.
-  EXPECT_EQ(kInterframeDelayMs2,
-            statistics_proxy_->GetStats().interframe_delay_max_ms);
+  EXPECT_EQ(kInterframeDelayMs2, FlushAndGetStats().interframe_delay_max_ms);
 }
 
 TEST_F(ReceiveStatisticsProxy2Test, ReportInterframeDelayInWindow) {
@@ -305,27 +313,24 @@
   EXPECT_EQ(-1, statistics_proxy_->GetStats().interframe_delay_max_ms);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(-1, statistics_proxy_->GetStats().interframe_delay_max_ms);
+  EXPECT_EQ(-1, FlushAndGetStats().interframe_delay_max_ms);
 
   fake_clock_.AdvanceTimeMilliseconds(kInterframeDelayMs1);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(kInterframeDelayMs1,
-            statistics_proxy_->GetStats().interframe_delay_max_ms);
+  EXPECT_EQ(kInterframeDelayMs1, FlushAndGetStats().interframe_delay_max_ms);
 
   fake_clock_.AdvanceTimeMilliseconds(kInterframeDelayMs2);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
   // Still first delay is the maximum
-  EXPECT_EQ(kInterframeDelayMs1,
-            statistics_proxy_->GetStats().interframe_delay_max_ms);
+  EXPECT_EQ(kInterframeDelayMs1, FlushAndGetStats().interframe_delay_max_ms);
 
   fake_clock_.AdvanceTimeMilliseconds(kInterframeDelayMs3);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
   // Now the first sample is out of the window, so the second is the maximum.
-  EXPECT_EQ(kInterframeDelayMs2,
-            statistics_proxy_->GetStats().interframe_delay_max_ms);
+  EXPECT_EQ(kInterframeDelayMs2, FlushAndGetStats().interframe_delay_max_ms);
 }
 
 TEST_F(ReceiveStatisticsProxy2Test, ReportsFreezeMetrics) {
@@ -434,7 +439,7 @@
   EXPECT_EQ(absl::nullopt, statistics_proxy_->GetStats().qp_sum);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(absl::nullopt, statistics_proxy_->GetStats().qp_sum);
+  EXPECT_EQ(absl::nullopt, FlushAndGetStats().qp_sum);
 }
 
 TEST_F(ReceiveStatisticsProxy2Test, OnDecodedFrameWithoutQpResetsQpSum) {
@@ -442,10 +447,10 @@
   EXPECT_EQ(absl::nullopt, statistics_proxy_->GetStats().qp_sum);
   statistics_proxy_->OnDecodedFrame(frame, 3u, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(3u, statistics_proxy_->GetStats().qp_sum);
+  EXPECT_EQ(3u, FlushAndGetStats().qp_sum);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0,
                                     VideoContentType::UNSPECIFIED);
-  EXPECT_EQ(absl::nullopt, statistics_proxy_->GetStats().qp_sum);
+  EXPECT_EQ(absl::nullopt, FlushAndGetStats().qp_sum);
 }
 
 TEST_F(ReceiveStatisticsProxy2Test, OnRenderedFrameIncreasesFramesRendered) {
@@ -920,6 +925,7 @@
     fake_clock_.AdvanceTimeMilliseconds(1000 / kDefaultFps);
   }
 
+  loop_.Flush();
   EXPECT_EQ(kDefaultFps, statistics_proxy_->GetStats().decode_frame_rate);
   EXPECT_EQ(kDefaultFps, statistics_proxy_->GetStats().render_frame_rate);
 
@@ -991,8 +997,7 @@
 
   // Min run time has passed.
   fake_clock_.AdvanceTimeMilliseconds((metrics::kMinRunTimeInSeconds * 1000));
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   EXPECT_METRIC_EQ(1,
                    metrics::NumSamples("WebRTC.Video.DelayedFramesToRenderer"));
   EXPECT_METRIC_EQ(
@@ -1049,8 +1054,7 @@
 
   // Min run time has passed.
   fake_clock_.AdvanceTimeMilliseconds((metrics::kMinRunTimeInSeconds * 1000));
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   EXPECT_METRIC_EQ(1,
                    metrics::NumSamples("WebRTC.Video.DelayedFramesToRenderer"));
   EXPECT_METRIC_EQ(
@@ -1081,8 +1085,7 @@
 
   // Min run time has passed.
   fake_clock_.AdvanceTimeMilliseconds((metrics::kMinRunTimeInSeconds * 1000));
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   EXPECT_METRIC_EQ(1,
                    metrics::NumSamples("WebRTC.Video.DelayedFramesToRenderer"));
   EXPECT_METRIC_EQ(
@@ -1224,8 +1227,7 @@
   fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0, content_type_);
 
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   const int kExpectedInterFrame =
       (kInterFrameDelayMs * (kMinRequiredSamples - 1) +
        kInterFrameDelayMs * 2) /
@@ -1264,8 +1266,7 @@
   fake_clock_.AdvanceTimeMilliseconds(10 * kInterFrameDelayMs);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0, content_type_);
 
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   const int kExpectedInterFrame = kInterFrameDelayMs * 2;
   if (videocontenttypehelpers::IsScreenshare(content_type_)) {
     EXPECT_METRIC_EQ(
@@ -1312,6 +1313,7 @@
     fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   }
 
+  loop_.Flush();
   // At this state, we should have a valid inter-frame delay.
   // Indicate stream paused and make a large jump in time.
   statistics_proxy_->OnStreamInactive();
@@ -1323,8 +1325,7 @@
   fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0, content_type_);
 
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   if (videocontenttypehelpers::IsScreenshare(content_type_)) {
     EXPECT_METRIC_EQ(
         1, metrics::NumSamples("WebRTC.Video.Screenshare.InterframeDelayInMs"));
@@ -1356,17 +1357,18 @@
   webrtc::VideoFrame frame = CreateFrame(kWidth, kHeight);
 
   for (int i = 0; i < kMinRequiredSamples; ++i) {
-    statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0, content_type_);
-    statistics_proxy_->OnRenderedFrame(MetaData(frame));
+    VideoFrameMetaData meta = MetaData(frame);
+    statistics_proxy_->OnDecodedFrame(meta, absl::nullopt, 0, content_type_);
+    statistics_proxy_->OnRenderedFrame(meta);
     fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   }
   // Add extra freeze.
   fake_clock_.AdvanceTimeMilliseconds(kFreezeDelayMs);
-  statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0, content_type_);
-  statistics_proxy_->OnRenderedFrame(MetaData(frame));
+  VideoFrameMetaData meta = MetaData(frame);
+  statistics_proxy_->OnDecodedFrame(meta, absl::nullopt, 0, content_type_);
+  statistics_proxy_->OnRenderedFrame(meta);
 
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   const int kExpectedTimeBetweenFreezes =
       kInterFrameDelayMs * (kMinRequiredSamples - 1);
   const int kExpectedNumberFreezesPerMinute = 60 * 1000 / kCallDurationMs;
@@ -1407,18 +1409,19 @@
 
   // Freezes and pauses should be included into harmonic frame rate.
   // Add freeze.
+  loop_.Flush();
   fake_clock_.AdvanceTimeMilliseconds(kFreezeDurationMs);
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0, content_type_);
   statistics_proxy_->OnRenderedFrame(MetaData(frame));
 
   // Add pause.
+  loop_.Flush();
   fake_clock_.AdvanceTimeMilliseconds(kPauseDurationMs);
   statistics_proxy_->OnStreamInactive();
   statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0, content_type_);
   statistics_proxy_->OnRenderedFrame(MetaData(frame));
 
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   double kSumSquaredFrameDurationSecs =
       (kMinRequiredSamples - 1) *
       (kFrameDurationMs / 1000.0 * kFrameDurationMs / 1000.0);
@@ -1444,23 +1447,23 @@
   webrtc::VideoFrame frame = CreateFrame(kWidth, kHeight);
 
   for (int i = 0; i <= kMinRequiredSamples; ++i) {
-    statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0, content_type_);
-    statistics_proxy_->OnRenderedFrame(MetaData(frame));
+    VideoFrameMetaData meta = MetaData(frame);
+    statistics_proxy_->OnDecodedFrame(meta, absl::nullopt, 0, content_type_);
+    statistics_proxy_->OnRenderedFrame(meta);
     fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   }
   // Add a pause.
   fake_clock_.AdvanceTimeMilliseconds(kPauseDurationMs);
   statistics_proxy_->OnStreamInactive();
-
   // Second playback interval with triple the length.
   for (int i = 0; i <= kMinRequiredSamples * 3; ++i) {
-    statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0, content_type_);
-    statistics_proxy_->OnRenderedFrame(MetaData(frame));
+    VideoFrameMetaData meta = MetaData(frame);
+    statistics_proxy_->OnDecodedFrame(meta, absl::nullopt, 0, content_type_);
+    statistics_proxy_->OnRenderedFrame(meta);
     fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   }
 
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   // Average of two playback intervals.
   const int kExpectedTimeBetweenFreezes =
       kInterFrameDelayMs * kMinRequiredSamples * 2;
@@ -1515,16 +1518,16 @@
 
   // HD frames.
   for (int i = 0; i < kMinRequiredSamples; ++i) {
-    statistics_proxy_->OnDecodedFrame(frame_hd, absl::nullopt, 0,
-                                      content_type_);
-    statistics_proxy_->OnRenderedFrame(MetaData(frame_hd));
+    VideoFrameMetaData meta = MetaData(frame_hd);
+    statistics_proxy_->OnDecodedFrame(meta, absl::nullopt, 0, content_type_);
+    statistics_proxy_->OnRenderedFrame(meta);
     fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   }
   // SD frames.
   for (int i = 0; i < 2 * kMinRequiredSamples; ++i) {
-    statistics_proxy_->OnDecodedFrame(frame_sd, absl::nullopt, 0,
-                                      content_type_);
-    statistics_proxy_->OnRenderedFrame(MetaData(frame_sd));
+    VideoFrameMetaData meta = MetaData(frame_sd);
+    statistics_proxy_->OnDecodedFrame(meta, absl::nullopt, 0, content_type_);
+    statistics_proxy_->OnRenderedFrame(meta);
     fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   }
   // Extra last frame.
@@ -1551,22 +1554,23 @@
 
   // High quality frames.
   for (int i = 0; i < kMinRequiredSamples; ++i) {
-    statistics_proxy_->OnDecodedFrame(frame, kLowQp, 0, content_type_);
-    statistics_proxy_->OnRenderedFrame(MetaData(frame));
+    VideoFrameMetaData meta = MetaData(frame);
+    statistics_proxy_->OnDecodedFrame(meta, kLowQp, 0, content_type_);
+    statistics_proxy_->OnRenderedFrame(meta);
     fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   }
   // Blocky frames.
   for (int i = 0; i < 2 * kMinRequiredSamples; ++i) {
-    statistics_proxy_->OnDecodedFrame(frame, kHighQp, 0, content_type_);
-    statistics_proxy_->OnRenderedFrame(MetaData(frame));
+    VideoFrameMetaData meta = MetaData(frame);
+    statistics_proxy_->OnDecodedFrame(meta, kHighQp, 0, content_type_);
+    statistics_proxy_->OnRenderedFrame(meta);
     fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   }
   // Extra last frame.
   statistics_proxy_->OnDecodedFrame(frame, kHighQp, 0, content_type_);
   statistics_proxy_->OnRenderedFrame(MetaData(frame));
 
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   const int kExpectedTimeInHdPercents = 66;
   if (videocontenttypehelpers::IsScreenshare(content_type_)) {
     EXPECT_METRIC_EQ(
@@ -1590,6 +1594,7 @@
   // Call once to pass content type.
   statistics_proxy_->OnDecodedFrame(frame_hd, absl::nullopt, 0, content_type_);
 
+  loop_.Flush();
   statistics_proxy_->OnRenderedFrame(MetaData(frame_hd));
   fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
 
@@ -1627,8 +1632,7 @@
     statistics_proxy_->OnDecodedFrame(frame, kLowQp, kDecodeMs, content_type_);
     fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs);
   }
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
   EXPECT_METRIC_EQ(
       1, metrics::NumEvents("WebRTC.Video.DecodeTimeInMs", kDecodeMs));
 }
@@ -1653,8 +1657,7 @@
     fake_clock_.AdvanceTimeMilliseconds(kInterFrameDelayMs2);
     statistics_proxy_->OnDecodedFrame(frame, absl::nullopt, 0, content_type);
   }
-  statistics_proxy_->UpdateHistograms(absl::nullopt, StreamDataCounters(),
-                                      nullptr);
+  FlushAndUpdateHistograms(absl::nullopt, StreamDataCounters(), nullptr);
 
   if (videocontenttypehelpers::IsScreenshare(content_type)) {
     EXPECT_METRIC_EQ(
@@ -1776,6 +1779,8 @@
     fake_clock_.AdvanceTimeMilliseconds(kFrameDurationMs);
   }
 
+  loop_.Flush();
+
   EXPECT_METRIC_EQ(expected_number_of_samples_,
                    metrics::NumSamples(uma_histogram_name_));
   EXPECT_METRIC_EQ(expected_number_of_samples_,
diff --git a/video/video_receive_stream2.cc b/video/video_receive_stream2.cc
index 6649fca..dbc3455 100644
--- a/video/video_receive_stream2.cc
+++ b/video/video_receive_stream2.cc
@@ -706,8 +706,12 @@
   // To avoid spamming keyframe requests for a stream that is not active we
   // check if we have received a packet within the last 5 seconds.
   bool stream_is_active = last_packet_ms && now_ms - *last_packet_ms < 5000;
-  if (!stream_is_active)
-    stats_proxy_.OnStreamInactive();
+  if (!stream_is_active) {
+    worker_thread_->PostTask(ToQueuedTask(task_safety_flag_, [this]() {
+      RTC_DCHECK_RUN_ON(&worker_sequence_checker_);
+      stats_proxy_.OnStreamInactive();
+    }));
+  }
 
   if (stream_is_active && !IsReceivingKeyFrame(now_ms) &&
       (!config_.crypto_options.sframe.require_frame_encryption ||