/*
 *  Copyright 2019 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */
#include "test/time_controller/simulated_time_controller.h"

#include <algorithm>
#include <deque>
#include <list>
#include <map>
#include <memory>
#include <string>
#include <thread>
#include <vector>

#include "absl/strings/string_view.h"

namespace webrtc {
namespace {
// Helper function to remove from a std container by value.
template <class C>
bool RemoveByValue(C& vec, typename C::value_type val) {
  auto it = std::find(vec.begin(), vec.end(), val);
  if (it == vec.end())
    return false;
  vec.erase(it);
  return true;
}
}  // namespace

namespace sim_time_impl {
class SimulatedSequenceRunner : public ProcessThread, public TaskQueueBase {
 public:
  SimulatedSequenceRunner(SimulatedTimeControllerImpl* handler,
                          absl::string_view queue_name)
      : handler_(handler), name_(queue_name) {}
  ~SimulatedSequenceRunner() override { handler_->Unregister(this); }

  // Provides next run time.
  Timestamp GetNextRunTime() const;

  // Iterates through delayed tasks and modules and moves them to the ready set
  // if they are supposed to execute by |at time|.
  void UpdateReady(Timestamp at_time);
  // Runs all ready tasks and modules and updates next run time.
  void Run(Timestamp at_time);

  // TaskQueueBase interface
  void Delete() override;
  // Note: PostTask is also in ProcessThread interface.
  void PostTask(std::unique_ptr<QueuedTask> task) override;
  void PostDelayedTask(std::unique_ptr<QueuedTask> task,
                       uint32_t milliseconds) override;

  // ProcessThread interface
  void Start() override;
  void Stop() override;
  void WakeUp(Module* module) override;
  void RegisterModule(Module* module, const rtc::Location& from) override;
  void DeRegisterModule(Module* module) override;
  // Promoted to public for use in SimulatedTimeControllerImpl::YieldExecution.
  using CurrentTaskQueueSetter = TaskQueueBase::CurrentTaskQueueSetter;

 private:
  Timestamp GetCurrentTime() const { return handler_->CurrentTime(); }
  void RunReadyTasks(Timestamp at_time) RTC_LOCKS_EXCLUDED(lock_);
  void RunReadyModules(Timestamp at_time) RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
  void UpdateNextRunTime() RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
  Timestamp GetNextTime(Module* module, Timestamp at_time);

  SimulatedTimeControllerImpl* const handler_;
  const std::string name_;

  rtc::CriticalSection lock_;

  std::deque<std::unique_ptr<QueuedTask>> ready_tasks_ RTC_GUARDED_BY(lock_);
  std::map<Timestamp, std::vector<std::unique_ptr<QueuedTask>>> delayed_tasks_
      RTC_GUARDED_BY(lock_);

  bool process_thread_running_ RTC_GUARDED_BY(lock_) = false;
  std::vector<Module*> stopped_modules_ RTC_GUARDED_BY(lock_);
  std::vector<Module*> ready_modules_ RTC_GUARDED_BY(lock_);
  std::map<Timestamp, std::list<Module*>> delayed_modules_
      RTC_GUARDED_BY(lock_);

  Timestamp next_run_time_ RTC_GUARDED_BY(lock_) = Timestamp::PlusInfinity();
};

Timestamp SimulatedSequenceRunner::GetNextRunTime() const {
  rtc::CritScope lock(&lock_);
  return next_run_time_;
}

void SimulatedSequenceRunner::UpdateReady(Timestamp at_time) {
  rtc::CritScope lock(&lock_);
  for (auto it = delayed_tasks_.begin();
       it != delayed_tasks_.end() && it->first <= at_time;
       it = delayed_tasks_.erase(it)) {
    for (auto& task : it->second) {
      ready_tasks_.emplace_back(std::move(task));
    }
  }
  for (auto it = delayed_modules_.begin();
       it != delayed_modules_.end() && it->first <= at_time;
       it = delayed_modules_.erase(it)) {
    for (auto module : it->second) {
      ready_modules_.push_back(module);
    }
  }
}

void SimulatedSequenceRunner::Run(Timestamp at_time) {
  RunReadyTasks(at_time);
  rtc::CritScope lock(&lock_);
  RunReadyModules(at_time);
  UpdateNextRunTime();
}

void SimulatedSequenceRunner::Delete() {
  {
    rtc::CritScope lock(&lock_);
    ready_tasks_.clear();
    delayed_tasks_.clear();
  }
  delete this;
}

void SimulatedSequenceRunner::RunReadyTasks(Timestamp at_time) {
  std::deque<std::unique_ptr<QueuedTask>> ready_tasks;
  {
    rtc::CritScope lock(&lock_);
    ready_tasks.swap(ready_tasks_);
  }
  if (!ready_tasks.empty()) {
    CurrentTaskQueueSetter set_current(this);
    for (auto& ready : ready_tasks) {
      bool delete_task = ready->Run();
      if (delete_task) {
        ready.reset();
      } else {
        ready.release();
      }
    }
  }
}

void SimulatedSequenceRunner::RunReadyModules(Timestamp at_time) {
  if (!ready_modules_.empty()) {
    CurrentTaskQueueSetter set_current(this);
    for (auto* module : ready_modules_) {
      module->Process();
      delayed_modules_[GetNextTime(module, at_time)].push_back(module);
    }
  }
  ready_modules_.clear();
}

void SimulatedSequenceRunner::UpdateNextRunTime() {
  if (!ready_tasks_.empty() || !ready_modules_.empty()) {
    next_run_time_ = Timestamp::MinusInfinity();
  } else {
    next_run_time_ = Timestamp::PlusInfinity();
    if (!delayed_tasks_.empty())
      next_run_time_ = std::min(next_run_time_, delayed_tasks_.begin()->first);
    if (!delayed_modules_.empty())
      next_run_time_ =
          std::min(next_run_time_, delayed_modules_.begin()->first);
  }
}

void SimulatedSequenceRunner::PostTask(std::unique_ptr<QueuedTask> task) {
  rtc::CritScope lock(&lock_);
  ready_tasks_.emplace_back(std::move(task));
  next_run_time_ = Timestamp::MinusInfinity();
}

void SimulatedSequenceRunner::PostDelayedTask(std::unique_ptr<QueuedTask> task,
                                              uint32_t milliseconds) {
  rtc::CritScope lock(&lock_);
  Timestamp target_time = GetCurrentTime() + TimeDelta::ms(milliseconds);
  delayed_tasks_[target_time].push_back(std::move(task));
  next_run_time_ = std::min(next_run_time_, target_time);
}

void SimulatedSequenceRunner::Start() {
  std::vector<Module*> starting;
  {
    rtc::CritScope lock(&lock_);
    if (process_thread_running_)
      return;
    process_thread_running_ = true;
    starting.swap(stopped_modules_);
  }
  for (auto& module : starting)
    module->ProcessThreadAttached(this);

  Timestamp at_time = GetCurrentTime();
  rtc::CritScope lock(&lock_);
  for (auto& module : starting)
    delayed_modules_[GetNextTime(module, at_time)].push_back(module);
  UpdateNextRunTime();
}

void SimulatedSequenceRunner::Stop() {
  std::vector<Module*> stopping;
  {
    rtc::CritScope lock(&lock_);
    process_thread_running_ = false;

    for (auto* ready : ready_modules_)
      stopped_modules_.push_back(ready);
    ready_modules_.clear();

    for (auto& delayed : delayed_modules_) {
      for (auto mod : delayed.second)
        stopped_modules_.push_back(mod);
    }
    delayed_modules_.clear();

    stopping = stopped_modules_;
  }
  for (auto& module : stopping)
    module->ProcessThreadAttached(nullptr);
}

void SimulatedSequenceRunner::WakeUp(Module* module) {
  rtc::CritScope lock(&lock_);
  // If we already are planning to run this module as soon as possible, we don't
  // need to do anything.
  for (auto mod : ready_modules_)
    if (mod == module)
      return;

  for (auto it = delayed_modules_.begin(); it != delayed_modules_.end(); ++it) {
    if (RemoveByValue(it->second, module))
      break;
  }
  Timestamp next_time = GetNextTime(module, GetCurrentTime());
  delayed_modules_[next_time].push_back(module);
  next_run_time_ = std::min(next_run_time_, next_time);
}

void SimulatedSequenceRunner::RegisterModule(Module* module,
                                             const rtc::Location& from) {
  module->ProcessThreadAttached(this);
  rtc::CritScope lock(&lock_);
  if (!process_thread_running_) {
    stopped_modules_.push_back(module);
  } else {
    Timestamp next_time = GetNextTime(module, GetCurrentTime());
    delayed_modules_[next_time].push_back(module);
    next_run_time_ = std::min(next_run_time_, next_time);
  }
}

void SimulatedSequenceRunner::DeRegisterModule(Module* module) {
  bool modules_running;
  {
    rtc::CritScope lock(&lock_);
    if (!process_thread_running_) {
      RemoveByValue(stopped_modules_, module);
    } else {
      bool removed = RemoveByValue(ready_modules_, module);
      if (!removed) {
        for (auto& pair : delayed_modules_) {
          if (RemoveByValue(pair.second, module))
            break;
        }
      }
    }
    modules_running = process_thread_running_;
  }
  if (modules_running)
    module->ProcessThreadAttached(nullptr);
}

Timestamp SimulatedSequenceRunner::GetNextTime(Module* module,
                                               Timestamp at_time) {
  CurrentTaskQueueSetter set_current(this);
  return at_time + TimeDelta::ms(module->TimeUntilNextProcess());
}

SimulatedTimeControllerImpl::SimulatedTimeControllerImpl(Timestamp start_time)
    : thread_id_(rtc::CurrentThreadId()), current_time_(start_time) {}

SimulatedTimeControllerImpl::~SimulatedTimeControllerImpl() = default;

std::unique_ptr<TaskQueueBase, TaskQueueDeleter>
SimulatedTimeControllerImpl::CreateTaskQueue(
    absl::string_view name,
    TaskQueueFactory::Priority priority) const {
  // TODO(srte): Remove the const cast when the interface is made mutable.
  auto mutable_this = const_cast<SimulatedTimeControllerImpl*>(this);
  auto task_queue = std::unique_ptr<SimulatedSequenceRunner, TaskQueueDeleter>(
      new SimulatedSequenceRunner(mutable_this, name));
  rtc::CritScope lock(&mutable_this->lock_);
  mutable_this->runners_.push_back(task_queue.get());
  return task_queue;
}

std::unique_ptr<ProcessThread> SimulatedTimeControllerImpl::CreateProcessThread(
    const char* thread_name) {
  rtc::CritScope lock(&lock_);
  auto process_thread =
      std::make_unique<SimulatedSequenceRunner>(this, thread_name);
  runners_.push_back(process_thread.get());
  return process_thread;
}

void SimulatedTimeControllerImpl::YieldExecution() {
  if (rtc::CurrentThreadId() == thread_id_) {
    TaskQueueBase* yielding_from = TaskQueueBase::Current();
    // Since we might continue execution on a process thread, we should reset
    // the thread local task queue reference. This ensures that thread checkers
    // won't think we are executing on the yielding task queue. It also ensure
    // that TaskQueueBase::Current() won't return the yielding task queue.
    SimulatedSequenceRunner::CurrentTaskQueueSetter reset_queue(nullptr);
    RTC_DCHECK_RUN_ON(&thread_checker_);
    // When we yield, we don't want to risk executing further tasks on the
    // currently executing task queue. If there's a ready task that also yields,
    // it's added to this set as well and only tasks on the remaining task
    // queues are executed.
    auto inserted = yielded_.insert(yielding_from);
    RTC_DCHECK(inserted.second);
    RunReadyRunners();
    yielded_.erase(inserted.first);
  }
}

void SimulatedTimeControllerImpl::RunReadyRunners() {
  RTC_DCHECK_RUN_ON(&thread_checker_);
  rtc::CritScope lock(&lock_);
  RTC_DCHECK_EQ(rtc::CurrentThreadId(), thread_id_);
  Timestamp current_time = CurrentTime();
  // Clearing |ready_runners_| in case this is a recursive call:
  // RunReadyRunners -> Run -> Event::Wait -> Yield ->RunReadyRunners
  ready_runners_.clear();

  // We repeat until we have no ready left to handle tasks posted by ready
  // runners.
  while (true) {
    for (auto* runner : runners_) {
      if (yielded_.find(runner) == yielded_.end() &&
          runner->GetNextRunTime() <= current_time) {
        ready_runners_.push_back(runner);
      }
    }
    if (ready_runners_.empty())
      return;
    while (!ready_runners_.empty()) {
      auto* runner = ready_runners_.front();
      ready_runners_.pop_front();
      runner->UpdateReady(current_time);
      // Note that the Run function might indirectly cause a call to
      // Unregister() which will recursively grab |lock_| again to remove items
      // from |ready_runners_|.
      runner->Run(current_time);
    }
  }
}

Timestamp SimulatedTimeControllerImpl::CurrentTime() const {
  rtc::CritScope lock(&time_lock_);
  return current_time_;
}

Timestamp SimulatedTimeControllerImpl::NextRunTime() const {
  Timestamp current_time = CurrentTime();
  Timestamp next_time = Timestamp::PlusInfinity();
  rtc::CritScope lock(&lock_);
  for (auto* runner : runners_) {
    Timestamp next_run_time = runner->GetNextRunTime();
    if (next_run_time <= current_time)
      return current_time;
    next_time = std::min(next_time, next_run_time);
  }
  return next_time;
}

void SimulatedTimeControllerImpl::AdvanceTime(Timestamp target_time) {
  rtc::CritScope time_lock(&time_lock_);
  RTC_DCHECK_GE(target_time, current_time_);
  current_time_ = target_time;
}

void SimulatedTimeControllerImpl::Unregister(SimulatedSequenceRunner* runner) {
  rtc::CritScope lock(&lock_);
  bool removed = RemoveByValue(runners_, runner);
  RTC_CHECK(removed);
  RemoveByValue(ready_runners_, runner);
}

}  // namespace sim_time_impl

GlobalSimulatedTimeController::GlobalSimulatedTimeController(
    Timestamp start_time)
    : sim_clock_(start_time.us()), impl_(start_time) {
  global_clock_.SetTime(start_time);
}

GlobalSimulatedTimeController::~GlobalSimulatedTimeController() = default;

Clock* GlobalSimulatedTimeController::GetClock() {
  return &sim_clock_;
}

TaskQueueFactory* GlobalSimulatedTimeController::GetTaskQueueFactory() {
  return &impl_;
}

std::unique_ptr<ProcessThread>
GlobalSimulatedTimeController::CreateProcessThread(const char* thread_name) {
  return impl_.CreateProcessThread(thread_name);
}

void GlobalSimulatedTimeController::AdvanceTime(TimeDelta duration) {
  rtc::ScopedYieldPolicy yield_policy(&impl_);
  Timestamp current_time = impl_.CurrentTime();
  Timestamp target_time = current_time + duration;
  RTC_DCHECK_EQ(current_time.us(), rtc::TimeMicros());
  while (current_time < target_time) {
    impl_.RunReadyRunners();
    Timestamp next_time = std::min(impl_.NextRunTime(), target_time);
    impl_.AdvanceTime(next_time);
    auto delta = next_time - current_time;
    current_time = next_time;
    sim_clock_.AdvanceTimeMicroseconds(delta.us());
    global_clock_.AdvanceTime(delta);
  }
}

void GlobalSimulatedTimeController::InvokeWithControlledYield(
    std::function<void()> closure) {
  rtc::ScopedYieldPolicy yield_policy(&impl_);
  closure();
}

rtc::YieldInterface* GlobalSimulatedTimeController::YieldInterface() {
  return &impl_;
}

// namespace sim_time_impl

}  // namespace webrtc
