|  | /* | 
|  | *  Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. | 
|  | * | 
|  | *  Use of this source code is governed by a BSD-style license | 
|  | *  that can be found in the LICENSE file in the root of the source | 
|  | *  tree. An additional intellectual property rights grant can be found | 
|  | *  in the file PATENTS.  All contributing project authors may | 
|  | *  be found in the AUTHORS file in the root of the source tree. | 
|  | */ | 
|  |  | 
|  | #include "logging/rtc_event_log/rtc_event_log_impl.h" | 
|  |  | 
|  | #include <cstddef> | 
|  | #include <cstdint> | 
|  | #include <functional> | 
|  | #include <iterator> | 
|  | #include <memory> | 
|  | #include <string> | 
|  | #include <utility> | 
|  |  | 
|  | #include "absl/strings/string_view.h" | 
|  | #include "api/environment/environment.h" | 
|  | #include "api/field_trials_view.h" | 
|  | #include "api/rtc_event_log/rtc_event.h" | 
|  | #include "api/rtc_event_log/rtc_event_log.h" | 
|  | #include "api/rtc_event_log_output.h" | 
|  | #include "api/sequence_checker.h" | 
|  | #include "api/task_queue/task_queue_base.h" | 
|  | #include "api/units/time_delta.h" | 
|  | #include "logging/rtc_event_log/encoder/rtc_event_log_encoder.h" | 
|  | #include "logging/rtc_event_log/encoder/rtc_event_log_encoder_legacy.h" | 
|  | #include "logging/rtc_event_log/encoder/rtc_event_log_encoder_new_format.h" | 
|  | #include "rtc_base/checks.h" | 
|  | #include "rtc_base/event.h" | 
|  | #include "rtc_base/logging.h" | 
|  | #include "rtc_base/numerics/safe_minmax.h" | 
|  | #include "rtc_base/synchronization/mutex.h" | 
|  | #include "rtc_base/time_utils.h" | 
|  |  | 
|  | namespace webrtc { | 
|  | namespace { | 
|  |  | 
|  | std::unique_ptr<RtcEventLogEncoder> CreateEncoder(const Environment& env) { | 
|  | if (env.field_trials().IsDisabled("WebRTC-RtcEventLogNewFormat")) { | 
|  | RTC_DLOG(LS_INFO) << "Creating legacy encoder for RTC event log."; | 
|  | return std::make_unique<RtcEventLogEncoderLegacy>(); | 
|  | } else { | 
|  | RTC_DLOG(LS_INFO) << "Creating new format encoder for RTC event log."; | 
|  | return std::make_unique<RtcEventLogEncoderNewFormat>(env.field_trials()); | 
|  | } | 
|  | } | 
|  |  | 
|  | }  // namespace | 
|  |  | 
|  | RtcEventLogImpl::RtcEventLogImpl(const Environment& env) | 
|  | : RtcEventLogImpl(CreateEncoder(env), &env.task_queue_factory()) {} | 
|  |  | 
|  | RtcEventLogImpl::RtcEventLogImpl(std::unique_ptr<RtcEventLogEncoder> encoder, | 
|  | TaskQueueFactory* task_queue_factory, | 
|  | size_t max_events_in_history, | 
|  | size_t max_config_events_in_history) | 
|  | : max_events_in_history_(max_events_in_history), | 
|  | max_config_events_in_history_(max_config_events_in_history), | 
|  | event_encoder_(std::move(encoder)), | 
|  | last_output_ms_(rtc::TimeMillis()), | 
|  | task_queue_(task_queue_factory->CreateTaskQueue( | 
|  | "rtc_event_log", | 
|  | TaskQueueFactory::Priority::NORMAL)) {} | 
|  |  | 
|  | RtcEventLogImpl::~RtcEventLogImpl() { | 
|  | // If we're logging to the output, this will stop that. Blocking function. | 
|  | mutex_.Lock(); | 
|  | bool started = logging_state_started_; | 
|  | mutex_.Unlock(); | 
|  |  | 
|  | if (started) { | 
|  | logging_state_checker_.Detach(); | 
|  | StopLogging(); | 
|  | } | 
|  |  | 
|  | // Since we are posting tasks bound to `this`, it is critical that the event | 
|  | // log and its members outlive `task_queue_`. Destruct `task_queue_` first | 
|  | // to ensure tasks living on the queue can access other members. | 
|  | // We want to block on any executing task by deleting TaskQueue before | 
|  | // we set unique_ptr's internal pointer to null. | 
|  | task_queue_.get_deleter()(task_queue_.get()); | 
|  | task_queue_.release(); | 
|  | } | 
|  |  | 
|  | bool RtcEventLogImpl::StartLogging(std::unique_ptr<RtcEventLogOutput> output, | 
|  | int64_t output_period_ms) { | 
|  | RTC_DCHECK(output); | 
|  | RTC_DCHECK(output_period_ms == kImmediateOutput || output_period_ms > 0); | 
|  |  | 
|  | if (!output->IsActive()) { | 
|  | // TODO(eladalon): We may want to remove the IsActive method. Otherwise | 
|  | // we probably want to be consistent and terminate any existing output. | 
|  | return false; | 
|  | } | 
|  |  | 
|  | const int64_t timestamp_us = rtc::TimeMillis() * 1000; | 
|  | const int64_t utc_time_us = rtc::TimeUTCMillis() * 1000; | 
|  | RTC_LOG(LS_INFO) << "Starting WebRTC event log. (Timestamp, UTC) = (" | 
|  | << timestamp_us << ", " << utc_time_us << ")."; | 
|  |  | 
|  | RTC_DCHECK_RUN_ON(&logging_state_checker_); | 
|  | MutexLock lock(&mutex_); | 
|  | logging_state_started_ = true; | 
|  | immediately_output_mode_ = (output_period_ms == kImmediateOutput); | 
|  | need_schedule_output_ = (output_period_ms != kImmediateOutput); | 
|  |  | 
|  | // Binding to `this` is safe because `this` outlives the `task_queue_`. | 
|  | task_queue_->PostTask([this, output_period_ms, timestamp_us, utc_time_us, | 
|  | output = std::move(output), | 
|  | histories = ExtractRecentHistories()]() mutable { | 
|  | RTC_DCHECK_RUN_ON(task_queue_.get()); | 
|  | RTC_DCHECK(output); | 
|  | RTC_DCHECK(output->IsActive()); | 
|  | output_period_ms_ = output_period_ms; | 
|  | event_output_ = std::move(output); | 
|  |  | 
|  | WriteToOutput(event_encoder_->EncodeLogStart(timestamp_us, utc_time_us)); | 
|  | // Load all configs of previous sessions. | 
|  | if (!all_config_history_.empty()) { | 
|  | EventDeque& history = histories.config_history; | 
|  | history.insert(history.begin(), | 
|  | std::make_move_iterator(all_config_history_.begin()), | 
|  | std::make_move_iterator(all_config_history_.end())); | 
|  | all_config_history_.clear(); | 
|  |  | 
|  | if (history.size() > max_config_events_in_history_) { | 
|  | RTC_LOG(LS_WARNING) | 
|  | << "Dropping config events: " << history.size() | 
|  | << " exceeds maximum " << max_config_events_in_history_; | 
|  | history.erase(history.begin(), history.begin() + history.size() - | 
|  | max_config_events_in_history_); | 
|  | } | 
|  | } | 
|  | LogEventsToOutput(std::move(histories)); | 
|  | }); | 
|  |  | 
|  | return true; | 
|  | } | 
|  |  | 
|  | void RtcEventLogImpl::StopLogging() { | 
|  | RTC_DLOG(LS_INFO) << "Stopping WebRTC event log."; | 
|  | // TODO(bugs.webrtc.org/14449): Do not block current thread waiting on the | 
|  | // task queue. It might work for now, for current callers, but disallows | 
|  | // caller to share threads with the `task_queue_`. | 
|  | rtc::Event output_stopped; | 
|  | StopLogging([&output_stopped]() { output_stopped.Set(); }); | 
|  | output_stopped.Wait(rtc::Event::kForever); | 
|  |  | 
|  | RTC_DLOG(LS_INFO) << "WebRTC event log successfully stopped."; | 
|  | } | 
|  |  | 
|  | void RtcEventLogImpl::StopLogging(std::function<void()> callback) { | 
|  | RTC_DCHECK_RUN_ON(&logging_state_checker_); | 
|  | MutexLock lock(&mutex_); | 
|  | logging_state_started_ = false; | 
|  | task_queue_->PostTask( | 
|  | [this, callback, histories = ExtractRecentHistories()]() mutable { | 
|  | RTC_DCHECK_RUN_ON(task_queue_.get()); | 
|  | if (event_output_) { | 
|  | RTC_DCHECK(event_output_->IsActive()); | 
|  | LogEventsToOutput(std::move(histories)); | 
|  | } | 
|  | StopLoggingInternal(); | 
|  | callback(); | 
|  | }); | 
|  | } | 
|  |  | 
|  | RtcEventLogImpl::EventHistories RtcEventLogImpl::ExtractRecentHistories() { | 
|  | EventHistories histories; | 
|  | std::swap(histories, recent_); | 
|  | return histories; | 
|  | } | 
|  |  | 
|  | void RtcEventLogImpl::Log(std::unique_ptr<RtcEvent> event) { | 
|  | RTC_CHECK(event); | 
|  | MutexLock lock(&mutex_); | 
|  |  | 
|  | LogToMemory(std::move(event)); | 
|  | if (logging_state_started_) { | 
|  | if (ShouldOutputImmediately()) { | 
|  | // Binding to `this` is safe because `this` outlives the `task_queue_`. | 
|  | task_queue_->PostTask( | 
|  | [this, histories = ExtractRecentHistories()]() mutable { | 
|  | RTC_DCHECK_RUN_ON(task_queue_.get()); | 
|  | if (event_output_) { | 
|  | RTC_DCHECK(event_output_->IsActive()); | 
|  | LogEventsToOutput(std::move(histories)); | 
|  | } | 
|  | }); | 
|  | } else if (need_schedule_output_) { | 
|  | need_schedule_output_ = false; | 
|  | // Binding to `this` is safe because `this` outlives the `task_queue_`. | 
|  | task_queue_->PostTask([this]() mutable { | 
|  | RTC_DCHECK_RUN_ON(task_queue_.get()); | 
|  | if (event_output_) { | 
|  | RTC_DCHECK(event_output_->IsActive()); | 
|  | ScheduleOutput(); | 
|  | } | 
|  | }); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | bool RtcEventLogImpl::ShouldOutputImmediately() { | 
|  | if (recent_.history.size() >= max_events_in_history_) { | 
|  | // We have to emergency drain the buffer. We can't wait for the scheduled | 
|  | // output task because there might be other event incoming before that. | 
|  | return true; | 
|  | } | 
|  |  | 
|  | return immediately_output_mode_; | 
|  | } | 
|  |  | 
|  | void RtcEventLogImpl::ScheduleOutput() { | 
|  | RTC_DCHECK(output_period_ms_ != kImmediateOutput); | 
|  | // Binding to `this` is safe because `this` outlives the `task_queue_`. | 
|  | auto output_task = [this]() { | 
|  | RTC_DCHECK_RUN_ON(task_queue_.get()); | 
|  | // Allow scheduled output if the `event_output_` is valid. | 
|  | if (event_output_) { | 
|  | RTC_DCHECK(event_output_->IsActive()); | 
|  | mutex_.Lock(); | 
|  | RTC_DCHECK(!need_schedule_output_); | 
|  | // Let the next `Log()` to schedule output. | 
|  | need_schedule_output_ = true; | 
|  | EventHistories histories = ExtractRecentHistories(); | 
|  | mutex_.Unlock(); | 
|  | LogEventsToOutput(std::move(histories)); | 
|  | } | 
|  | }; | 
|  | const int64_t now_ms = rtc::TimeMillis(); | 
|  | const int64_t time_since_output_ms = now_ms - last_output_ms_; | 
|  | const int32_t delay = rtc::SafeClamp(output_period_ms_ - time_since_output_ms, | 
|  | 0, output_period_ms_); | 
|  | task_queue_->PostDelayedTask(std::move(output_task), | 
|  | TimeDelta::Millis(delay)); | 
|  | } | 
|  |  | 
|  | void RtcEventLogImpl::LogToMemory(std::unique_ptr<RtcEvent> event) { | 
|  | EventDeque& container = | 
|  | event->IsConfigEvent() ? recent_.config_history : recent_.history; | 
|  | const size_t container_max_size = event->IsConfigEvent() | 
|  | ? max_config_events_in_history_ | 
|  | : max_events_in_history_; | 
|  |  | 
|  | // Shouldn't lose events if started. | 
|  | if (container.size() >= container_max_size && !logging_state_started_) { | 
|  | container.pop_front(); | 
|  | } | 
|  | container.push_back(std::move(event)); | 
|  | } | 
|  |  | 
|  | void RtcEventLogImpl::LogEventsToOutput(EventHistories histories) { | 
|  | last_output_ms_ = rtc::TimeMillis(); | 
|  |  | 
|  | // Serialize the stream configurations. | 
|  | std::string encoded_configs = event_encoder_->EncodeBatch( | 
|  | histories.config_history.begin(), histories.config_history.end()); | 
|  |  | 
|  | // Serialize the events in the event queue. Note that the write may fail, | 
|  | // for example if we are writing to a file and have reached the maximum limit. | 
|  | // We don't get any feedback if this happens, so we still remove the events | 
|  | // from the event log history. This is normally not a problem, but if another | 
|  | // log is started immediately after the first one becomes full, then one | 
|  | // cannot rely on the second log to contain everything that isn't in the first | 
|  | // log; one batch of events might be missing. | 
|  | std::string encoded_history = event_encoder_->EncodeBatch( | 
|  | histories.history.begin(), histories.history.end()); | 
|  |  | 
|  | WriteConfigsAndHistoryToOutput(encoded_configs, encoded_history); | 
|  |  | 
|  | // Unlike other events, the configs are retained. If we stop/start logging | 
|  | // again, these configs are used to interpret other events. | 
|  | all_config_history_.insert( | 
|  | all_config_history_.end(), | 
|  | std::make_move_iterator(histories.config_history.begin()), | 
|  | std::make_move_iterator(histories.config_history.end())); | 
|  | if (all_config_history_.size() > max_config_events_in_history_) { | 
|  | RTC_LOG(LS_WARNING) << "Dropping config events: " | 
|  | << all_config_history_.size() << " exceeds maximum " | 
|  | << max_config_events_in_history_; | 
|  | all_config_history_.erase(all_config_history_.begin(), | 
|  | all_config_history_.begin() + | 
|  | all_config_history_.size() - | 
|  | max_config_events_in_history_); | 
|  | } | 
|  | } | 
|  |  | 
|  | void RtcEventLogImpl::WriteConfigsAndHistoryToOutput( | 
|  | absl::string_view encoded_configs, | 
|  | absl::string_view encoded_history) { | 
|  | // This function is used to merge the strings instead of calling the output | 
|  | // object twice with small strings. The function also avoids copying any | 
|  | // strings in the typical case where there are no config events. | 
|  | if (encoded_configs.empty()) { | 
|  | WriteToOutput(encoded_history);  // Typical case. | 
|  | } else if (encoded_history.empty()) { | 
|  | WriteToOutput(encoded_configs);  // Very unusual case. | 
|  | } else { | 
|  | std::string s; | 
|  | s.reserve(encoded_configs.size() + encoded_history.size()); | 
|  | s.append(encoded_configs.data(), encoded_configs.size()); | 
|  | s.append(encoded_history.data(), encoded_history.size()); | 
|  | WriteToOutput(s); | 
|  | } | 
|  | } | 
|  |  | 
|  | void RtcEventLogImpl::StopOutput() { | 
|  | event_output_.reset(); | 
|  | } | 
|  |  | 
|  | void RtcEventLogImpl::StopLoggingInternal() { | 
|  | if (event_output_) { | 
|  | RTC_DCHECK(event_output_->IsActive()); | 
|  | const int64_t timestamp_us = rtc::TimeMillis() * 1000; | 
|  | event_output_->Write(event_encoder_->EncodeLogEnd(timestamp_us)); | 
|  | } | 
|  | StopOutput(); | 
|  | } | 
|  |  | 
|  | void RtcEventLogImpl::WriteToOutput(absl::string_view output_string) { | 
|  | if (event_output_) { | 
|  | RTC_DCHECK(event_output_->IsActive()); | 
|  | if (!event_output_->Write(output_string)) { | 
|  | RTC_LOG(LS_ERROR) << "Failed to write RTC event to output."; | 
|  | // The first failure closes the output. | 
|  | RTC_DCHECK(!event_output_->IsActive()); | 
|  | StopOutput();  // Clean-up. | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | }  // namespace webrtc |