RtcEventLog: Separate `LogToMemory` from TaskQueue to current thread
Original implementation uses TaskQueue to async execute both `LogToMemory` and `WriteToOutput`. `LogToMemory` is invoked in high frequency, but the execution takes a very short time. It would be a bit more expensive to post on TaskQueue than execution on current thread with locking. It is because that the TaskQueue switches the thread context for execution.
This CL separates `LogToMemory` from TaskQueue to current thread, in
order to avoid frequent context switching; And periodically schedule
`WriteToOutput` to TaskQueue, not block current thread.
Link: https://webrtc-review.googlesource.com/c/src/+/283641
Bug: chromium:1288710
Change-Id: Ic78216aff16d1883b109e360a0892da3ca8f5ecb
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/295640
Reviewed-by: Björn Terelius <terelius@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Reviewed-by: Markus Handell <handellm@google.com>
Cr-Commit-Position: refs/heads/main@{#39629}
diff --git a/logging/rtc_event_log/rtc_event_log_impl.cc b/logging/rtc_event_log/rtc_event_log_impl.cc
index 31f1e55..f2b3f22 100644
--- a/logging/rtc_event_log/rtc_event_log_impl.cc
+++ b/logging/rtc_event_log/rtc_event_log_impl.cc
@@ -17,7 +17,6 @@
#include <vector>
#include "absl/strings/string_view.h"
-#include "absl/types/optional.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h"
#include "logging/rtc_event_log/encoder/rtc_event_log_encoder_legacy.h"
@@ -55,10 +54,7 @@
: max_events_in_history_(max_events_in_history),
max_config_events_in_history_(max_config_events_in_history),
event_encoder_(std::move(encoder)),
- num_config_events_written_(0),
last_output_ms_(rtc::TimeMillis()),
- output_scheduled_(false),
- logging_state_started_(false),
task_queue_(
std::make_unique<rtc::TaskQueue>(task_queue_factory->CreateTaskQueue(
"rtc_event_log",
@@ -66,7 +62,11 @@
RtcEventLogImpl::~RtcEventLogImpl() {
// If we're logging to the output, this will stop that. Blocking function.
- if (logging_state_started_) {
+ mutex_.Lock();
+ bool started = logging_state_started_;
+ mutex_.Unlock();
+
+ if (started) {
logging_state_checker_.Detach();
StopLogging();
}
@@ -80,6 +80,7 @@
bool RtcEventLogImpl::StartLogging(std::unique_ptr<RtcEventLogOutput> output,
int64_t output_period_ms) {
+ RTC_DCHECK(output);
RTC_DCHECK(output_period_ms == kImmediateOutput || output_period_ms > 0);
if (!output->IsActive()) {
@@ -94,17 +95,39 @@
<< timestamp_us << ", " << utc_time_us << ").";
RTC_DCHECK_RUN_ON(&logging_state_checker_);
+ MutexLock lock(&mutex_);
logging_state_started_ = true;
+ immediately_output_mode_ = (output_period_ms == kImmediateOutput);
+ need_schedule_output_ = (output_period_ms != kImmediateOutput);
+
// Binding to `this` is safe because `this` outlives the `task_queue_`.
task_queue_->PostTask([this, output_period_ms, timestamp_us, utc_time_us,
- output = std::move(output)]() mutable {
+ output = std::move(output),
+ histories = ExtractRecentHistories()]() mutable {
RTC_DCHECK_RUN_ON(task_queue_.get());
+ RTC_DCHECK(output);
RTC_DCHECK(output->IsActive());
output_period_ms_ = output_period_ms;
event_output_ = std::move(output);
- num_config_events_written_ = 0;
+
WriteToOutput(event_encoder_->EncodeLogStart(timestamp_us, utc_time_us));
- LogEventsFromMemoryToOutput();
+ // Load all configs of previous sessions.
+ if (!all_config_history_.empty()) {
+ EventDeque& history = histories.config_history;
+ history.insert(history.begin(),
+ std::make_move_iterator(all_config_history_.begin()),
+ std::make_move_iterator(all_config_history_.end()));
+ all_config_history_.clear();
+
+ if (history.size() > max_config_events_in_history_) {
+ RTC_LOG(LS_WARNING)
+ << "Dropping config events: " << history.size()
+ << " exceeds maximum " << max_config_events_in_history_;
+ history.erase(history.begin(), history.begin() + history.size() -
+ max_config_events_in_history_);
+ }
+ }
+ LogEventsToOutput(std::move(histories));
});
return true;
@@ -124,97 +147,111 @@
void RtcEventLogImpl::StopLogging(std::function<void()> callback) {
RTC_DCHECK_RUN_ON(&logging_state_checker_);
+ MutexLock lock(&mutex_);
logging_state_started_ = false;
- task_queue_->PostTask([this, callback] {
- RTC_DCHECK_RUN_ON(task_queue_.get());
- if (event_output_) {
- RTC_DCHECK(event_output_->IsActive());
- LogEventsFromMemoryToOutput();
- }
- StopLoggingInternal();
- callback();
- });
+ task_queue_->PostTask(
+ [this, callback, histories = ExtractRecentHistories()]() mutable {
+ RTC_DCHECK_RUN_ON(task_queue_.get());
+ if (event_output_) {
+ RTC_DCHECK(event_output_->IsActive());
+ LogEventsToOutput(std::move(histories));
+ }
+ StopLoggingInternal();
+ callback();
+ });
+}
+
+RtcEventLogImpl::EventHistories RtcEventLogImpl::ExtractRecentHistories() {
+ EventHistories histories;
+ std::swap(histories, recent_);
+ return histories;
}
void RtcEventLogImpl::Log(std::unique_ptr<RtcEvent> event) {
RTC_CHECK(event);
+ MutexLock lock(&mutex_);
- // Binding to `this` is safe because `this` outlives the `task_queue_`.
- task_queue_->PostTask([this, event = std::move(event)]() mutable {
- RTC_DCHECK_RUN_ON(task_queue_.get());
- LogToMemory(std::move(event));
- if (event_output_)
- ScheduleOutput();
- });
+ LogToMemory(std::move(event));
+ if (logging_state_started_) {
+ if (ShouldOutputImmediately()) {
+ // Binding to `this` is safe because `this` outlives the `task_queue_`.
+ task_queue_->PostTask(
+ [this, histories = ExtractRecentHistories()]() mutable {
+ RTC_DCHECK_RUN_ON(task_queue_.get());
+ if (event_output_) {
+ RTC_DCHECK(event_output_->IsActive());
+ LogEventsToOutput(std::move(histories));
+ }
+ });
+ } else if (need_schedule_output_) {
+ need_schedule_output_ = false;
+ // Binding to `this` is safe because `this` outlives the `task_queue_`.
+ task_queue_->PostTask([this]() mutable {
+ RTC_DCHECK_RUN_ON(task_queue_.get());
+ if (event_output_) {
+ RTC_DCHECK(event_output_->IsActive());
+ ScheduleOutput();
+ }
+ });
+ }
+ }
+}
+
+bool RtcEventLogImpl::ShouldOutputImmediately() {
+ if (recent_.history.size() >= max_events_in_history_) {
+ // We have to emergency drain the buffer. We can't wait for the scheduled
+ // output task because there might be other event incoming before that.
+ return true;
+ }
+
+ return immediately_output_mode_;
}
void RtcEventLogImpl::ScheduleOutput() {
- RTC_DCHECK(event_output_ && event_output_->IsActive());
- if (history_.size() >= max_events_in_history_) {
- // We have to emergency drain the buffer. We can't wait for the scheduled
- // output task because there might be other event incoming before that.
- LogEventsFromMemoryToOutput();
- return;
- }
-
- RTC_DCHECK(output_period_ms_.has_value());
- if (*output_period_ms_ == kImmediateOutput) {
- // We are already on the `task_queue_` so there is no reason to post a task
- // if we want to output immediately.
- LogEventsFromMemoryToOutput();
- return;
- }
-
- if (!output_scheduled_) {
- output_scheduled_ = true;
- // Binding to `this` is safe because `this` outlives the `task_queue_`.
- auto output_task = [this]() {
- RTC_DCHECK_RUN_ON(task_queue_.get());
- if (event_output_) {
- RTC_DCHECK(event_output_->IsActive());
- LogEventsFromMemoryToOutput();
- }
- output_scheduled_ = false;
- };
- const int64_t now_ms = rtc::TimeMillis();
- const int64_t time_since_output_ms = now_ms - last_output_ms_;
- const uint32_t delay = rtc::SafeClamp(
- *output_period_ms_ - time_since_output_ms, 0, *output_period_ms_);
- task_queue_->PostDelayedTask(std::move(output_task),
- TimeDelta::Millis(delay));
- }
+ RTC_DCHECK(output_period_ms_ != kImmediateOutput);
+ // Binding to `this` is safe because `this` outlives the `task_queue_`.
+ auto output_task = [this]() {
+ RTC_DCHECK_RUN_ON(task_queue_.get());
+ // Allow scheduled output if the `event_output_` is valid.
+ if (event_output_) {
+ RTC_DCHECK(event_output_->IsActive());
+ mutex_.Lock();
+ RTC_DCHECK(!need_schedule_output_);
+ // Let the next `Log()` to schedule output.
+ need_schedule_output_ = true;
+ EventHistories histories = ExtractRecentHistories();
+ mutex_.Unlock();
+ LogEventsToOutput(std::move(histories));
+ }
+ };
+ const int64_t now_ms = rtc::TimeMillis();
+ const int64_t time_since_output_ms = now_ms - last_output_ms_;
+ const int32_t delay = rtc::SafeClamp(output_period_ms_ - time_since_output_ms,
+ 0, output_period_ms_);
+ task_queue_->PostDelayedTask(std::move(output_task),
+ TimeDelta::Millis(delay));
}
void RtcEventLogImpl::LogToMemory(std::unique_ptr<RtcEvent> event) {
- std::deque<std::unique_ptr<RtcEvent>>& container =
- event->IsConfigEvent() ? config_history_ : history_;
+ EventDeque& container =
+ event->IsConfigEvent() ? recent_.config_history : recent_.history;
const size_t container_max_size = event->IsConfigEvent()
? max_config_events_in_history_
: max_events_in_history_;
- if (container.size() >= container_max_size) {
- RTC_DCHECK(!event_output_); // Shouldn't lose events if we have an output.
+ // Shouldn't lose events if started.
+ if (container.size() >= container_max_size && !logging_state_started_) {
container.pop_front();
}
container.push_back(std::move(event));
}
-void RtcEventLogImpl::LogEventsFromMemoryToOutput() {
- RTC_DCHECK(event_output_ && event_output_->IsActive());
+void RtcEventLogImpl::LogEventsToOutput(EventHistories histories) {
last_output_ms_ = rtc::TimeMillis();
- // Serialize all stream configurations that haven't already been written to
- // this output. `num_config_events_written_` is used to track which configs we
- // have already written. (Note that the config may have been written to
- // previous outputs; configs are not discarded.)
- std::string encoded_configs;
- RTC_DCHECK_LE(num_config_events_written_, config_history_.size());
- if (num_config_events_written_ < config_history_.size()) {
- const auto begin = config_history_.begin() + num_config_events_written_;
- const auto end = config_history_.end();
- encoded_configs = event_encoder_->EncodeBatch(begin, end);
- num_config_events_written_ = config_history_.size();
- }
+ // Serialize the stream configurations.
+ std::string encoded_configs = event_encoder_->EncodeBatch(
+ histories.config_history.begin(), histories.config_history.end());
// Serialize the events in the event queue. Note that the write may fail,
// for example if we are writing to a file and have reached the maximum limit.
@@ -223,11 +260,26 @@
// log is started immediately after the first one becomes full, then one
// cannot rely on the second log to contain everything that isn't in the first
// log; one batch of events might be missing.
- std::string encoded_history =
- event_encoder_->EncodeBatch(history_.begin(), history_.end());
- history_.clear();
+ std::string encoded_history = event_encoder_->EncodeBatch(
+ histories.history.begin(), histories.history.end());
WriteConfigsAndHistoryToOutput(encoded_configs, encoded_history);
+
+ // Unlike other events, the configs are retained. If we stop/start logging
+ // again, these configs are used to interpret other events.
+ all_config_history_.insert(
+ all_config_history_.end(),
+ std::make_move_iterator(histories.config_history.begin()),
+ std::make_move_iterator(histories.config_history.end()));
+ if (all_config_history_.size() > max_config_events_in_history_) {
+ RTC_LOG(LS_WARNING) << "Dropping config events: "
+ << all_config_history_.size() << " exceeds maximum "
+ << max_config_events_in_history_;
+ all_config_history_.erase(all_config_history_.begin(),
+ all_config_history_.begin() +
+ all_config_history_.size() -
+ max_config_events_in_history_);
+ }
}
void RtcEventLogImpl::WriteConfigsAndHistoryToOutput(
@@ -263,13 +315,14 @@
}
void RtcEventLogImpl::WriteToOutput(absl::string_view output_string) {
- RTC_DCHECK(event_output_ && event_output_->IsActive());
- if (!event_output_->Write(output_string)) {
- RTC_LOG(LS_ERROR) << "Failed to write RTC event to output.";
- // The first failure closes the output.
- RTC_DCHECK(!event_output_->IsActive());
- StopOutput(); // Clean-up.
- return;
+ if (event_output_) {
+ RTC_DCHECK(event_output_->IsActive());
+ if (!event_output_->Write(output_string)) {
+ RTC_LOG(LS_ERROR) << "Failed to write RTC event to output.";
+ // The first failure closes the output.
+ RTC_DCHECK(!event_output_->IsActive());
+ StopOutput(); // Clean-up.
+ }
}
}
diff --git a/logging/rtc_event_log/rtc_event_log_impl.h b/logging/rtc_event_log/rtc_event_log_impl.h
index 6ffed0a..3187a7f 100644
--- a/logging/rtc_event_log/rtc_event_log_impl.h
+++ b/logging/rtc_event_log/rtc_event_log_impl.h
@@ -18,7 +18,6 @@
#include <string>
#include "absl/strings/string_view.h"
-#include "absl/types/optional.h"
#include "api/rtc_event_log/rtc_event.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/rtc_event_log_output.h"
@@ -60,11 +59,23 @@
void StopLogging() override;
void StopLogging(std::function<void()> callback) override;
+ // Records event into `recent_` on current thread, and schedules the output on
+ // task queue if the buffers are full or `output_period_ms_` is expired.
void Log(std::unique_ptr<RtcEvent> event) override;
private:
- void LogToMemory(std::unique_ptr<RtcEvent> event) RTC_RUN_ON(task_queue_);
- void LogEventsFromMemoryToOutput() RTC_RUN_ON(task_queue_);
+ using EventDeque = std::deque<std::unique_ptr<RtcEvent>>;
+
+ struct EventHistories {
+ EventDeque config_history;
+ EventDeque history;
+ };
+
+ // Helper to extract and clear `recent_`.
+ EventHistories ExtractRecentHistories() RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ void LogToMemory(std::unique_ptr<RtcEvent> event)
+ RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
+ void LogEventsToOutput(EventHistories histories) RTC_RUN_ON(task_queue_);
void StopOutput() RTC_RUN_ON(task_queue_);
@@ -75,6 +86,7 @@
void StopLoggingInternal() RTC_RUN_ON(task_queue_);
+ bool ShouldOutputImmediately() RTC_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void ScheduleOutput() RTC_RUN_ON(task_queue_);
// Max size of event history.
@@ -84,29 +96,31 @@
const size_t max_config_events_in_history_;
// History containing all past configuration events.
- std::deque<std::unique_ptr<RtcEvent>> config_history_
- RTC_GUARDED_BY(*task_queue_);
+ EventDeque all_config_history_ RTC_GUARDED_BY(task_queue_);
- // History containing the most recent (non-configuration) events (~10s).
- std::deque<std::unique_ptr<RtcEvent>> history_ RTC_GUARDED_BY(*task_queue_);
+ // `config_history` containing the most recent configuration events.
+ // `history` containing the most recent (non-configuration) events (~10s).
+ EventHistories recent_ RTC_GUARDED_BY(mutex_);
std::unique_ptr<RtcEventLogEncoder> event_encoder_
- RTC_GUARDED_BY(*task_queue_);
- std::unique_ptr<RtcEventLogOutput> event_output_ RTC_GUARDED_BY(*task_queue_);
+ RTC_GUARDED_BY(task_queue_);
+ std::unique_ptr<RtcEventLogOutput> event_output_ RTC_GUARDED_BY(task_queue_);
- size_t num_config_events_written_ RTC_GUARDED_BY(*task_queue_);
- absl::optional<int64_t> output_period_ms_ RTC_GUARDED_BY(*task_queue_);
- int64_t last_output_ms_ RTC_GUARDED_BY(*task_queue_);
- bool output_scheduled_ RTC_GUARDED_BY(*task_queue_);
+ int64_t output_period_ms_ RTC_GUARDED_BY(task_queue_);
+ int64_t last_output_ms_ RTC_GUARDED_BY(task_queue_);
RTC_NO_UNIQUE_ADDRESS SequenceChecker logging_state_checker_;
- bool logging_state_started_ RTC_GUARDED_BY(logging_state_checker_);
+ bool logging_state_started_ RTC_GUARDED_BY(mutex_) = false;
+ bool immediately_output_mode_ RTC_GUARDED_BY(mutex_) = false;
+ bool need_schedule_output_ RTC_GUARDED_BY(mutex_) = false;
// Since we are posting tasks bound to `this`, it is critical that the event
// log and its members outlive `task_queue_`. Keep the `task_queue_`
// last to ensure it destructs first, or else tasks living on the queue might
// access other members after they've been torn down.
std::unique_ptr<rtc::TaskQueue> task_queue_;
+
+ Mutex mutex_;
};
} // namespace webrtc