Move ReceiveStatisticsProxy stats variables to the worker thread.

This reduces locking on the decoder thread and moves all stats
management to the worker thread, which also avoids contention between
querying for these stats and the threads where the media processing happens..

Bug: webrtc:11489,webrtc:11490
Change-Id: I802577eab6b48edcbe124c02a1b793a640b74181
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/174205
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Rasmus Brandt <brandtr@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31202}
diff --git a/video/receive_statistics_proxy2.cc b/video/receive_statistics_proxy2.cc
index cafea6c..0fda7c8 100644
--- a/video/receive_statistics_proxy2.cc
+++ b/video/receive_statistics_proxy2.cc
@@ -115,7 +115,6 @@
       video_quality_observer_(new VideoQualityObserver()),
       interframe_delay_max_moving_(kMovingMaxWindowMs),
       freq_offset_counter_(clock, nullptr, kFreqOffsetProcessIntervalMs),
-      avg_rtt_ms_(0),
       last_content_type_(VideoContentType::UNSPECIFIED),
       last_codec_type_(kVideoCodecVP8),
       num_delayed_frames_rendered_(0),
@@ -139,17 +138,11 @@
     const StreamDataCounters* rtx_stats) {
   RTC_DCHECK_RUN_ON(&main_thread_);
 
-  // TODO(bugs.webrtc.org/11489): Many of these variables don't need to be
-  // inside the scope of a lock. Also consider grabbing the lock only to copy
-  // the state that histograms need to be reported for, then report histograms
-  // while not holding the lock.
   char log_stream_buf[8 * 1024];
   rtc::SimpleStringBuilder log_stream(log_stream_buf);
 
   int stream_duration_sec = (clock_->TimeInMilliseconds() - start_ms_) / 1000;
 
-  rtc::CritScope lock(&crit_);
-
   if (stats_.frame_counts.key_frames > 0 ||
       stats_.frame_counts.delta_frames > 0) {
     RTC_HISTOGRAM_COUNTS_100000("WebRTC.Video.ReceiveStreamLifetimeInSeconds",
@@ -241,18 +234,12 @@
                << key_frames_permille << '\n';
   }
 
-  {
-    // We're not actually running on the decoder thread, but this function must
-    // be called after DecoderThreadStopped, which detaches the thread checker.
-    // It is therefore safe to access |qp_counters_|, which were updated on the
-    // decode thread earlier.
-    RTC_DCHECK_RUN_ON(&decode_queue_);
-    absl::optional<int> qp = qp_counters_.vp8.Avg(kMinRequiredSamples);
-    if (qp) {
-      RTC_HISTOGRAM_COUNTS_200("WebRTC.Video.Decoded.Vp8.Qp", *qp);
-      log_stream << "WebRTC.Video.Decoded.Vp8.Qp " << *qp << '\n';
-    }
+  absl::optional<int> qp = qp_counters_.vp8.Avg(kMinRequiredSamples);
+  if (qp) {
+    RTC_HISTOGRAM_COUNTS_200("WebRTC.Video.Decoded.Vp8.Qp", *qp);
+    log_stream << "WebRTC.Video.Decoded.Vp8.Qp " << *qp << '\n';
   }
+
   absl::optional<int> decode_ms = decode_time_counter_.Avg(kMinRequiredSamples);
   if (decode_ms) {
     RTC_HISTOGRAM_COUNTS_1000("WebRTC.Video.DecodeTimeInMs", *decode_ms);
@@ -579,7 +566,6 @@
   size_t framerate =
       (frame_window_.size() * 1000 + 500) / kRateStatisticsWindowSizeMs;
 
-  rtc::CritScope lock(&crit_);
   stats_.network_frame_rate = static_cast<int>(framerate);
 }
 
@@ -664,7 +650,6 @@
   int64_t now_ms = clock_->TimeInMilliseconds();
   UpdateFramerate(now_ms);
 
-  rtc::CritScope lock(&crit_);
   stats_.render_frame_rate = renders_fps_estimator_.Rate(now_ms).value_or(0);
   stats_.decode_frame_rate = decode_fps_estimator_.Rate(now_ms).value_or(0);
 
@@ -704,17 +689,21 @@
 
 void ReceiveStatisticsProxy::OnIncomingPayloadType(int payload_type) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
-  rtc::CritScope lock(&crit_);
-  stats_.current_payload_type = payload_type;
+  worker_thread_->PostTask(
+      ToQueuedTask(task_safety_flag_, [payload_type, this]() {
+        RTC_DCHECK_RUN_ON(&main_thread_);
+        stats_.current_payload_type = payload_type;
+      }));
 }
 
 void ReceiveStatisticsProxy::OnDecoderImplementationName(
     const char* implementation_name) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
-  // TODO(bugs.webrtc.org/11489): is a lock needed for this variable? Currently
-  // seems to be only touched on the decoder queue.
-  rtc::CritScope lock(&crit_);
-  stats_.decoder_implementation_name = implementation_name;
+  worker_thread_->PostTask(ToQueuedTask(
+      task_safety_flag_, [name = std::string(implementation_name), this]() {
+        RTC_DCHECK_RUN_ON(&main_thread_);
+        stats_.decoder_implementation_name = name;
+      }));
 }
 
 void ReceiveStatisticsProxy::OnFrameBufferTimingsUpdated(
@@ -725,46 +714,52 @@
     int min_playout_delay_ms,
     int render_delay_ms) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
-  rtc::CritScope lock(&crit_);
-  stats_.max_decode_ms = max_decode_ms;
-  stats_.current_delay_ms = current_delay_ms;
-  stats_.target_delay_ms = target_delay_ms;
-  stats_.jitter_buffer_ms = jitter_buffer_ms;
-  stats_.min_playout_delay_ms = min_playout_delay_ms;
-  stats_.render_delay_ms = render_delay_ms;
-  jitter_buffer_delay_counter_.Add(jitter_buffer_ms);
-  target_delay_counter_.Add(target_delay_ms);
-  current_delay_counter_.Add(current_delay_ms);
-  // Network delay (rtt/2) + target_delay_ms (jitter delay + decode time +
-  // render delay).
-  delay_counter_.Add(target_delay_ms + avg_rtt_ms_ / 2);
+  worker_thread_->PostTask(ToQueuedTask(
+      task_safety_flag_,
+      [max_decode_ms, current_delay_ms, target_delay_ms, jitter_buffer_ms,
+       min_playout_delay_ms, render_delay_ms, this]() {
+        RTC_DCHECK_RUN_ON(&main_thread_);
+        stats_.max_decode_ms = max_decode_ms;
+        stats_.current_delay_ms = current_delay_ms;
+        stats_.target_delay_ms = target_delay_ms;
+        stats_.jitter_buffer_ms = jitter_buffer_ms;
+        stats_.min_playout_delay_ms = min_playout_delay_ms;
+        stats_.render_delay_ms = render_delay_ms;
+        jitter_buffer_delay_counter_.Add(jitter_buffer_ms);
+        target_delay_counter_.Add(target_delay_ms);
+        current_delay_counter_.Add(current_delay_ms);
+        // Network delay (rtt/2) + target_delay_ms (jitter delay + decode time +
+        // render delay).
+        delay_counter_.Add(target_delay_ms + avg_rtt_ms_ / 2);
+      }));
 }
 
 void ReceiveStatisticsProxy::OnUniqueFramesCounted(int num_unique_frames) {
   RTC_DCHECK_RUN_ON(&main_thread_);
-  rtc::CritScope lock(&crit_);
   num_unique_frames_.emplace(num_unique_frames);
 }
 
 void ReceiveStatisticsProxy::OnTimingFrameInfoUpdated(
     const TimingFrameInfo& info) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
-  rtc::CritScope lock(&crit_);
-  if (info.flags != VideoSendTiming::kInvalid) {
-    int64_t now_ms = clock_->TimeInMilliseconds();
-    timing_frame_info_counter_.Add(info, now_ms);
-  }
+  worker_thread_->PostTask(ToQueuedTask(task_safety_flag_, [info, this]() {
+    RTC_DCHECK_RUN_ON(&main_thread_);
+    if (info.flags != VideoSendTiming::kInvalid) {
+      int64_t now_ms = clock_->TimeInMilliseconds();
+      timing_frame_info_counter_.Add(info, now_ms);
+    }
 
-  // Measure initial decoding latency between the first frame arriving and the
-  // first frame being decoded.
-  if (!first_frame_received_time_ms_.has_value()) {
-    first_frame_received_time_ms_ = info.receive_finish_ms;
-  }
-  if (stats_.first_frame_received_to_decoded_ms == -1 &&
-      first_decoded_frame_time_ms_) {
-    stats_.first_frame_received_to_decoded_ms =
-        *first_decoded_frame_time_ms_ - *first_frame_received_time_ms_;
-  }
+    // Measure initial decoding latency between the first frame arriving and
+    // the first frame being decoded.
+    if (!first_frame_received_time_ms_.has_value()) {
+      first_frame_received_time_ms_ = info.receive_finish_ms;
+    }
+    if (stats_.first_frame_received_to_decoded_ms == -1 &&
+        first_decoded_frame_time_ms_) {
+      stats_.first_frame_received_to_decoded_ms =
+          *first_decoded_frame_time_ms_ - *first_frame_received_time_ms_;
+    }
+  }));
 }
 
 void ReceiveStatisticsProxy::RtcpPacketTypesCounterUpdated(
@@ -793,7 +788,6 @@
   }
 
   RTC_DCHECK_RUN_ON(&main_thread_);
-  rtc::CritScope lock(&crit_);
   stats_.rtcp_packet_type_counts = packet_counter;
 }
 
@@ -804,7 +798,6 @@
   if (remote_ssrc_ != ssrc)
     return;
 
-  rtc::CritScope lock(&crit_);
   stats_.c_name = std::string(cname);
 }
 
@@ -816,12 +809,10 @@
   // may be on. E.g. on iOS this gets called on
   // "com.apple.coremedia.decompressionsession.clientcallback"
   VideoFrameMetaData meta(frame, clock_->CurrentTime());
-  worker_thread_->PostTask(ToQueuedTask([safety = task_safety_flag_, meta, qp,
-                                         decode_time_ms, content_type, this]() {
-    if (!safety->alive())
-      return;
-    OnDecodedFrame(meta, qp, decode_time_ms, content_type);
-  }));
+  worker_thread_->PostTask(ToQueuedTask(
+      task_safety_flag_, [meta, qp, decode_time_ms, content_type, this]() {
+        OnDecodedFrame(meta, qp, decode_time_ms, content_type);
+      }));
 }
 
 void ReceiveStatisticsProxy::OnDecodedFrame(
@@ -843,12 +834,12 @@
     video_quality_observer_.reset(new VideoQualityObserver());
   }
 
-  rtc::CritScope lock(&crit_);
   video_quality_observer_->OnDecodedFrame(frame_meta.rtp_timestamp, qp,
                                           last_codec_type_);
 
   ContentSpecificStats* content_specific_stats =
       &content_specific_stats_[content_type];
+
   ++stats_.frames_decoded;
   if (qp) {
     if (!stats_.qp_sum) {
@@ -905,17 +896,16 @@
   RTC_DCHECK_GT(frame_meta.width, 0);
   RTC_DCHECK_GT(frame_meta.height, 0);
 
-  // TODO(bugs.webrtc.org/11489): Remove lock once sync isn't needed.
-  rtc::CritScope lock(&crit_);
-
   video_quality_observer_->OnRenderedFrame(frame_meta);
 
   ContentSpecificStats* content_specific_stats =
       &content_specific_stats_[last_content_type_];
   renders_fps_estimator_.Update(1, frame_meta.decode_timestamp.ms());
+
   ++stats_.frames_rendered;
   stats_.width = frame_meta.width;
   stats_.height = frame_meta.height;
+
   render_fps_tracker_.AddSamples(1);
   render_pixel_tracker_.AddSamples(sqrt(frame_meta.width * frame_meta.height));
   content_specific_stats->received_width.Add(frame_meta.width);
@@ -936,6 +926,7 @@
       content_specific_stats->e2e_delay_counter.Add(delay_ms);
     }
   }
+
   QualitySample(frame_meta.decode_timestamp);
 }
 
@@ -943,28 +934,32 @@
                                                  int64_t sync_offset_ms,
                                                  double estimated_freq_khz) {
   RTC_DCHECK_RUN_ON(&incoming_render_queue_);
-  rtc::CritScope lock(&crit_);
-  // TODO(bugs.webrtc.org/11489): Lock possibly not needed for
-  // sync_offset_counter_ if it's only touched on the decoder thread.
-  sync_offset_counter_.Add(std::abs(sync_offset_ms));
-  stats_.sync_offset_ms = sync_offset_ms;
-  last_estimated_playout_ntp_timestamp_ms_ = video_playout_ntp_ms;
-  last_estimated_playout_time_ms_ = clock_->TimeInMilliseconds();
+  int64_t now_ms = clock_->TimeInMilliseconds();
+  worker_thread_->PostTask(
+      ToQueuedTask(task_safety_flag_, [video_playout_ntp_ms, sync_offset_ms,
+                                       estimated_freq_khz, now_ms, this]() {
+        RTC_DCHECK_RUN_ON(&main_thread_);
+        sync_offset_counter_.Add(std::abs(sync_offset_ms));
+        stats_.sync_offset_ms = sync_offset_ms;
+        last_estimated_playout_ntp_timestamp_ms_ = video_playout_ntp_ms;
+        last_estimated_playout_time_ms_ = now_ms;
 
-  const double kMaxFreqKhz = 10000.0;
-  int offset_khz = kMaxFreqKhz;
-  // Should not be zero or negative. If so, report max.
-  if (estimated_freq_khz < kMaxFreqKhz && estimated_freq_khz > 0.0)
-    offset_khz = static_cast<int>(std::fabs(estimated_freq_khz - 90.0) + 0.5);
+        const double kMaxFreqKhz = 10000.0;
+        int offset_khz = kMaxFreqKhz;
+        // Should not be zero or negative. If so, report max.
+        if (estimated_freq_khz < kMaxFreqKhz && estimated_freq_khz > 0.0)
+          offset_khz =
+              static_cast<int>(std::fabs(estimated_freq_khz - 90.0) + 0.5);
 
-  freq_offset_counter_.Add(offset_khz);
+        freq_offset_counter_.Add(offset_khz);
+      }));
 }
 
 void ReceiveStatisticsProxy::OnCompleteFrame(bool is_keyframe,
                                              size_t size_bytes,
                                              VideoContentType content_type) {
   RTC_DCHECK_RUN_ON(&main_thread_);
-  rtc::CritScope lock(&crit_);
+
   if (is_keyframe) {
     ++stats_.frame_counts.key_frames;
   } else {
@@ -994,18 +989,25 @@
 }
 
 void ReceiveStatisticsProxy::OnDroppedFrames(uint32_t frames_dropped) {
-  rtc::CritScope lock(&crit_);
-  stats_.frames_dropped += frames_dropped;
+  RTC_DCHECK_RUN_ON(&decode_queue_);
+  worker_thread_->PostTask(
+      ToQueuedTask(task_safety_flag_, [frames_dropped, this]() {
+        RTC_DCHECK_RUN_ON(&main_thread_);
+        stats_.frames_dropped += frames_dropped;
+      }));
 }
 
 void ReceiveStatisticsProxy::OnPreDecode(VideoCodecType codec_type, int qp) {
   RTC_DCHECK_RUN_ON(&decode_queue_);
-  rtc::CritScope lock(&crit_);
-  last_codec_type_ = codec_type;
-  if (last_codec_type_ == kVideoCodecVP8 && qp != -1) {
-    qp_counters_.vp8.Add(qp);
-    qp_sample_.Add(qp);
-  }
+  worker_thread_->PostTask(
+      ToQueuedTask(task_safety_flag_, [codec_type, qp, this]() {
+        RTC_DCHECK_RUN_ON(&main_thread_);
+        last_codec_type_ = codec_type;
+        if (last_codec_type_ == kVideoCodecVP8 && qp != -1) {
+          qp_counters_.vp8.Add(qp);
+          qp_sample_.Add(qp);
+        }
+      }));
 }
 
 void ReceiveStatisticsProxy::OnStreamInactive() {
@@ -1019,18 +1021,13 @@
   video_quality_observer_->OnStreamInactive();
 }
 
-void ReceiveStatisticsProxy::OnRttUpdate(int64_t avg_rtt_ms,
-                                         int64_t max_rtt_ms) {
-  // TODO(bugs.webrtc.org/11489): Is this a duplicate of
-  // VideoReceiveStream::OnRttUpdate?
-  // - looks like that runs on a/the module process thread.
-  //
-
-  // BUGBUG
-  // Actually, it looks like this method is never called except from a unit
-  // test, GetStatsReportsDecodeTimingStats.
-  rtc::CritScope lock(&crit_);
-  avg_rtt_ms_ = avg_rtt_ms;
+void ReceiveStatisticsProxy::OnRttUpdate(int64_t avg_rtt_ms) {
+  // TODO(bugs.webrtc.org/11489): This method is currently never called except
+  // from a unit test, GetStatsReportsDecodeTimingStats, and even then it has no
+  // effect. Once 11490 items in video_receive_stream2.cc have been addressed,
+  // we can uncomment the following:
+  // RTC_DCHECK_RUN_ON(&main_thread_);
+  // avg_rtt_ms_ = avg_rtt_ms;
 }
 
 void ReceiveStatisticsProxy::DecoderThreadStarting() {