RtcEventLog: Separate `LogToMemory` from TaskQueue to current thread

Original implementation uses TaskQueue to async execute both `LogToMemory` and `WriteToOutput`. `LogToMemory` is invoked in high frequency, but the execution takes a very short time. It would be a bit more expensive to post on TaskQueue than execution on current thread with locking. It is because that the TaskQueue switches the thread context for execution.

This CL separates `LogToMemory` from TaskQueue to current thread, in
order to avoid frequent context switching; And periodically schedule
`WriteToOutput` to TaskQueue, not block current thread.

Link: https://webrtc-review.googlesource.com/c/src/+/283641
Bug: chromium:1288710
Change-Id: Ic78216aff16d1883b109e360a0892da3ca8f5ecb
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/295640
Reviewed-by: Björn Terelius <terelius@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Reviewed-by: Markus Handell <handellm@google.com>
Cr-Commit-Position: refs/heads/main@{#39629}
diff --git a/logging/rtc_event_log/rtc_event_log_impl.cc b/logging/rtc_event_log/rtc_event_log_impl.cc
index 31f1e55..f2b3f22 100644
--- a/logging/rtc_event_log/rtc_event_log_impl.cc
+++ b/logging/rtc_event_log/rtc_event_log_impl.cc
@@ -17,7 +17,6 @@
 #include <vector>
 
 #include "absl/strings/string_view.h"
-#include "absl/types/optional.h"
 #include "api/task_queue/task_queue_base.h"
 #include "api/units/time_delta.h"
 #include "logging/rtc_event_log/encoder/rtc_event_log_encoder_legacy.h"
@@ -55,10 +54,7 @@
     : max_events_in_history_(max_events_in_history),
       max_config_events_in_history_(max_config_events_in_history),
       event_encoder_(std::move(encoder)),
-      num_config_events_written_(0),
       last_output_ms_(rtc::TimeMillis()),
-      output_scheduled_(false),
-      logging_state_started_(false),
       task_queue_(
           std::make_unique<rtc::TaskQueue>(task_queue_factory->CreateTaskQueue(
               "rtc_event_log",
@@ -66,7 +62,11 @@
 
 RtcEventLogImpl::~RtcEventLogImpl() {
   // If we're logging to the output, this will stop that. Blocking function.
-  if (logging_state_started_) {
+  mutex_.Lock();
+  bool started = logging_state_started_;
+  mutex_.Unlock();
+
+  if (started) {
     logging_state_checker_.Detach();
     StopLogging();
   }
@@ -80,6 +80,7 @@
 
 bool RtcEventLogImpl::StartLogging(std::unique_ptr<RtcEventLogOutput> output,
                                    int64_t output_period_ms) {
+  RTC_DCHECK(output);
   RTC_DCHECK(output_period_ms == kImmediateOutput || output_period_ms > 0);
 
   if (!output->IsActive()) {
@@ -94,17 +95,39 @@
                    << timestamp_us << ", " << utc_time_us << ").";
 
   RTC_DCHECK_RUN_ON(&logging_state_checker_);
+  MutexLock lock(&mutex_);
   logging_state_started_ = true;
+  immediately_output_mode_ = (output_period_ms == kImmediateOutput);
+  need_schedule_output_ = (output_period_ms != kImmediateOutput);
+
   // Binding to `this` is safe because `this` outlives the `task_queue_`.
   task_queue_->PostTask([this, output_period_ms, timestamp_us, utc_time_us,
-                         output = std::move(output)]() mutable {
+                         output = std::move(output),
+                         histories = ExtractRecentHistories()]() mutable {
     RTC_DCHECK_RUN_ON(task_queue_.get());
+    RTC_DCHECK(output);
     RTC_DCHECK(output->IsActive());
     output_period_ms_ = output_period_ms;
     event_output_ = std::move(output);
-    num_config_events_written_ = 0;
+
     WriteToOutput(event_encoder_->EncodeLogStart(timestamp_us, utc_time_us));
-    LogEventsFromMemoryToOutput();
+    // Load all configs of previous sessions.
+    if (!all_config_history_.empty()) {
+      EventDeque& history = histories.config_history;
+      history.insert(history.begin(),
+                     std::make_move_iterator(all_config_history_.begin()),
+                     std::make_move_iterator(all_config_history_.end()));
+      all_config_history_.clear();
+
+      if (history.size() > max_config_events_in_history_) {
+        RTC_LOG(LS_WARNING)
+            << "Dropping config events: " << history.size()
+            << " exceeds maximum " << max_config_events_in_history_;
+        history.erase(history.begin(), history.begin() + history.size() -
+                                           max_config_events_in_history_);
+      }
+    }
+    LogEventsToOutput(std::move(histories));
   });
 
   return true;
@@ -124,97 +147,111 @@
 
 void RtcEventLogImpl::StopLogging(std::function<void()> callback) {
   RTC_DCHECK_RUN_ON(&logging_state_checker_);
+  MutexLock lock(&mutex_);
   logging_state_started_ = false;
-  task_queue_->PostTask([this, callback] {
-    RTC_DCHECK_RUN_ON(task_queue_.get());
-    if (event_output_) {
-      RTC_DCHECK(event_output_->IsActive());
-      LogEventsFromMemoryToOutput();
-    }
-    StopLoggingInternal();
-    callback();
-  });
+  task_queue_->PostTask(
+      [this, callback, histories = ExtractRecentHistories()]() mutable {
+        RTC_DCHECK_RUN_ON(task_queue_.get());
+        if (event_output_) {
+          RTC_DCHECK(event_output_->IsActive());
+          LogEventsToOutput(std::move(histories));
+        }
+        StopLoggingInternal();
+        callback();
+      });
+}
+
+RtcEventLogImpl::EventHistories RtcEventLogImpl::ExtractRecentHistories() {
+  EventHistories histories;
+  std::swap(histories, recent_);
+  return histories;
 }
 
 void RtcEventLogImpl::Log(std::unique_ptr<RtcEvent> event) {
   RTC_CHECK(event);
+  MutexLock lock(&mutex_);
 
-  // Binding to `this` is safe because `this` outlives the `task_queue_`.
-  task_queue_->PostTask([this, event = std::move(event)]() mutable {
-    RTC_DCHECK_RUN_ON(task_queue_.get());
-    LogToMemory(std::move(event));
-    if (event_output_)
-      ScheduleOutput();
-  });
+  LogToMemory(std::move(event));
+  if (logging_state_started_) {
+    if (ShouldOutputImmediately()) {
+      // Binding to `this` is safe because `this` outlives the `task_queue_`.
+      task_queue_->PostTask(
+          [this, histories = ExtractRecentHistories()]() mutable {
+            RTC_DCHECK_RUN_ON(task_queue_.get());
+            if (event_output_) {
+              RTC_DCHECK(event_output_->IsActive());
+              LogEventsToOutput(std::move(histories));
+            }
+          });
+    } else if (need_schedule_output_) {
+      need_schedule_output_ = false;
+      // Binding to `this` is safe because `this` outlives the `task_queue_`.
+      task_queue_->PostTask([this]() mutable {
+        RTC_DCHECK_RUN_ON(task_queue_.get());
+        if (event_output_) {
+          RTC_DCHECK(event_output_->IsActive());
+          ScheduleOutput();
+        }
+      });
+    }
+  }
+}
+
+bool RtcEventLogImpl::ShouldOutputImmediately() {
+  if (recent_.history.size() >= max_events_in_history_) {
+    // We have to emergency drain the buffer. We can't wait for the scheduled
+    // output task because there might be other event incoming before that.
+    return true;
+  }
+
+  return immediately_output_mode_;
 }
 
 void RtcEventLogImpl::ScheduleOutput() {
-  RTC_DCHECK(event_output_ && event_output_->IsActive());
-  if (history_.size() >= max_events_in_history_) {
-    // We have to emergency drain the buffer. We can't wait for the scheduled
-    // output task because there might be other event incoming before that.
-    LogEventsFromMemoryToOutput();
-    return;
-  }
-
-  RTC_DCHECK(output_period_ms_.has_value());
-  if (*output_period_ms_ == kImmediateOutput) {
-    // We are already on the `task_queue_` so there is no reason to post a task
-    // if we want to output immediately.
-    LogEventsFromMemoryToOutput();
-    return;
-  }
-
-  if (!output_scheduled_) {
-    output_scheduled_ = true;
-    // Binding to `this` is safe because `this` outlives the `task_queue_`.
-    auto output_task = [this]() {
-      RTC_DCHECK_RUN_ON(task_queue_.get());
-      if (event_output_) {
-        RTC_DCHECK(event_output_->IsActive());
-        LogEventsFromMemoryToOutput();
-      }
-      output_scheduled_ = false;
-    };
-    const int64_t now_ms = rtc::TimeMillis();
-    const int64_t time_since_output_ms = now_ms - last_output_ms_;
-    const uint32_t delay = rtc::SafeClamp(
-        *output_period_ms_ - time_since_output_ms, 0, *output_period_ms_);
-    task_queue_->PostDelayedTask(std::move(output_task),
-                                 TimeDelta::Millis(delay));
-  }
+  RTC_DCHECK(output_period_ms_ != kImmediateOutput);
+  // Binding to `this` is safe because `this` outlives the `task_queue_`.
+  auto output_task = [this]() {
+    RTC_DCHECK_RUN_ON(task_queue_.get());
+    // Allow scheduled output if the `event_output_` is valid.
+    if (event_output_) {
+      RTC_DCHECK(event_output_->IsActive());
+      mutex_.Lock();
+      RTC_DCHECK(!need_schedule_output_);
+      // Let the next `Log()` to schedule output.
+      need_schedule_output_ = true;
+      EventHistories histories = ExtractRecentHistories();
+      mutex_.Unlock();
+      LogEventsToOutput(std::move(histories));
+    }
+  };
+  const int64_t now_ms = rtc::TimeMillis();
+  const int64_t time_since_output_ms = now_ms - last_output_ms_;
+  const int32_t delay = rtc::SafeClamp(output_period_ms_ - time_since_output_ms,
+                                       0, output_period_ms_);
+  task_queue_->PostDelayedTask(std::move(output_task),
+                               TimeDelta::Millis(delay));
 }
 
 void RtcEventLogImpl::LogToMemory(std::unique_ptr<RtcEvent> event) {
-  std::deque<std::unique_ptr<RtcEvent>>& container =
-      event->IsConfigEvent() ? config_history_ : history_;
+  EventDeque& container =
+      event->IsConfigEvent() ? recent_.config_history : recent_.history;
   const size_t container_max_size = event->IsConfigEvent()
                                         ? max_config_events_in_history_
                                         : max_events_in_history_;
 
-  if (container.size() >= container_max_size) {
-    RTC_DCHECK(!event_output_);  // Shouldn't lose events if we have an output.
+  // Shouldn't lose events if started.
+  if (container.size() >= container_max_size && !logging_state_started_) {
     container.pop_front();
   }
   container.push_back(std::move(event));
 }
 
-void RtcEventLogImpl::LogEventsFromMemoryToOutput() {
-  RTC_DCHECK(event_output_ && event_output_->IsActive());
+void RtcEventLogImpl::LogEventsToOutput(EventHistories histories) {
   last_output_ms_ = rtc::TimeMillis();
 
-  // Serialize all stream configurations that haven't already been written to
-  // this output. `num_config_events_written_` is used to track which configs we
-  // have already written. (Note that the config may have been written to
-  // previous outputs; configs are not discarded.)
-  std::string encoded_configs;
-  RTC_DCHECK_LE(num_config_events_written_, config_history_.size());
-  if (num_config_events_written_ < config_history_.size()) {
-    const auto begin = config_history_.begin() + num_config_events_written_;
-    const auto end = config_history_.end();
-    encoded_configs = event_encoder_->EncodeBatch(begin, end);
-    num_config_events_written_ = config_history_.size();
-  }
+  // Serialize the stream configurations.
+  std::string encoded_configs = event_encoder_->EncodeBatch(
+      histories.config_history.begin(), histories.config_history.end());
 
   // Serialize the events in the event queue. Note that the write may fail,
   // for example if we are writing to a file and have reached the maximum limit.
@@ -223,11 +260,26 @@
   // log is started immediately after the first one becomes full, then one
   // cannot rely on the second log to contain everything that isn't in the first
   // log; one batch of events might be missing.
-  std::string encoded_history =
-      event_encoder_->EncodeBatch(history_.begin(), history_.end());
-  history_.clear();
+  std::string encoded_history = event_encoder_->EncodeBatch(
+      histories.history.begin(), histories.history.end());
 
   WriteConfigsAndHistoryToOutput(encoded_configs, encoded_history);
+
+  // Unlike other events, the configs are retained. If we stop/start logging
+  // again, these configs are used to interpret other events.
+  all_config_history_.insert(
+      all_config_history_.end(),
+      std::make_move_iterator(histories.config_history.begin()),
+      std::make_move_iterator(histories.config_history.end()));
+  if (all_config_history_.size() > max_config_events_in_history_) {
+    RTC_LOG(LS_WARNING) << "Dropping config events: "
+                        << all_config_history_.size() << " exceeds maximum "
+                        << max_config_events_in_history_;
+    all_config_history_.erase(all_config_history_.begin(),
+                              all_config_history_.begin() +
+                                  all_config_history_.size() -
+                                  max_config_events_in_history_);
+  }
 }
 
 void RtcEventLogImpl::WriteConfigsAndHistoryToOutput(
@@ -263,13 +315,14 @@
 }
 
 void RtcEventLogImpl::WriteToOutput(absl::string_view output_string) {
-  RTC_DCHECK(event_output_ && event_output_->IsActive());
-  if (!event_output_->Write(output_string)) {
-    RTC_LOG(LS_ERROR) << "Failed to write RTC event to output.";
-    // The first failure closes the output.
-    RTC_DCHECK(!event_output_->IsActive());
-    StopOutput();  // Clean-up.
-    return;
+  if (event_output_) {
+    RTC_DCHECK(event_output_->IsActive());
+    if (!event_output_->Write(output_string)) {
+      RTC_LOG(LS_ERROR) << "Failed to write RTC event to output.";
+      // The first failure closes the output.
+      RTC_DCHECK(!event_output_->IsActive());
+      StopOutput();  // Clean-up.
+    }
   }
 }
 
diff --git a/logging/rtc_event_log/rtc_event_log_impl.h b/logging/rtc_event_log/rtc_event_log_impl.h
index 6ffed0a..3187a7f 100644
--- a/logging/rtc_event_log/rtc_event_log_impl.h
+++ b/logging/rtc_event_log/rtc_event_log_impl.h
@@ -18,7 +18,6 @@
 #include <string>
 
 #include "absl/strings/string_view.h"
-#include "absl/types/optional.h"
 #include "api/rtc_event_log/rtc_event.h"
 #include "api/rtc_event_log/rtc_event_log.h"
 #include "api/rtc_event_log_output.h"
@@ -60,11 +59,23 @@
   void StopLogging() override;
   void StopLogging(std::function<void()> callback) override;
 
+  // Records event into `recent_` on current thread, and schedules the output on
+  // task queue if the buffers are full or `output_period_ms_` is expired.
   void Log(std::unique_ptr<RtcEvent> event) override;
 
  private:
-  void LogToMemory(std::unique_ptr<RtcEvent> event) RTC_RUN_ON(task_queue_);
-  void LogEventsFromMemoryToOutput() RTC_RUN_ON(task_queue_);
+  using EventDeque = std::deque<std::unique_ptr<RtcEvent>>;
+
+  struct EventHistories {
+    EventDeque config_history;
+    EventDeque history;
+  };
+
+  // Helper to extract and clear `recent_`.
+  EventHistories ExtractRecentHistories() RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+  void LogToMemory(std::unique_ptr<RtcEvent> event)
+      RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+  void LogEventsToOutput(EventHistories histories) RTC_RUN_ON(task_queue_);
 
   void StopOutput() RTC_RUN_ON(task_queue_);
 
@@ -75,6 +86,7 @@
 
   void StopLoggingInternal() RTC_RUN_ON(task_queue_);
 
+  bool ShouldOutputImmediately() RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
   void ScheduleOutput() RTC_RUN_ON(task_queue_);
 
   // Max size of event history.
@@ -84,29 +96,31 @@
   const size_t max_config_events_in_history_;
 
   // History containing all past configuration events.
-  std::deque<std::unique_ptr<RtcEvent>> config_history_
-      RTC_GUARDED_BY(*task_queue_);
+  EventDeque all_config_history_ RTC_GUARDED_BY(task_queue_);
 
-  // History containing the most recent (non-configuration) events (~10s).
-  std::deque<std::unique_ptr<RtcEvent>> history_ RTC_GUARDED_BY(*task_queue_);
+  // `config_history` containing the most recent configuration events.
+  // `history` containing the most recent (non-configuration) events (~10s).
+  EventHistories recent_ RTC_GUARDED_BY(mutex_);
 
   std::unique_ptr<RtcEventLogEncoder> event_encoder_
-      RTC_GUARDED_BY(*task_queue_);
-  std::unique_ptr<RtcEventLogOutput> event_output_ RTC_GUARDED_BY(*task_queue_);
+      RTC_GUARDED_BY(task_queue_);
+  std::unique_ptr<RtcEventLogOutput> event_output_ RTC_GUARDED_BY(task_queue_);
 
-  size_t num_config_events_written_ RTC_GUARDED_BY(*task_queue_);
-  absl::optional<int64_t> output_period_ms_ RTC_GUARDED_BY(*task_queue_);
-  int64_t last_output_ms_ RTC_GUARDED_BY(*task_queue_);
-  bool output_scheduled_ RTC_GUARDED_BY(*task_queue_);
+  int64_t output_period_ms_ RTC_GUARDED_BY(task_queue_);
+  int64_t last_output_ms_ RTC_GUARDED_BY(task_queue_);
 
   RTC_NO_UNIQUE_ADDRESS SequenceChecker logging_state_checker_;
-  bool logging_state_started_ RTC_GUARDED_BY(logging_state_checker_);
+  bool logging_state_started_ RTC_GUARDED_BY(mutex_) = false;
+  bool immediately_output_mode_ RTC_GUARDED_BY(mutex_) = false;
+  bool need_schedule_output_ RTC_GUARDED_BY(mutex_) = false;
 
   // Since we are posting tasks bound to `this`,  it is critical that the event
   // log and its members outlive `task_queue_`. Keep the `task_queue_`
   // last to ensure it destructs first, or else tasks living on the queue might
   // access other members after they've been torn down.
   std::unique_ptr<rtc::TaskQueue> task_queue_;
+
+  Mutex mutex_;
 };
 
 }  // namespace webrtc