/*
 *  Copyright 2019 The WebRTC Project Authors. All rights reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#include "rtc_base/operations_chain.h"

#include <atomic>
#include <functional>
#include <memory>
#include <utility>
#include <vector>

#include "rtc_base/bind.h"
#include "rtc_base/event.h"
#include "rtc_base/gunit.h"
#include "rtc_base/thread.h"
#include "test/gmock.h"
#include "test/gtest.h"

namespace rtc {

using ::testing::ElementsAre;

namespace {

constexpr int kDefaultTimeout = 3000;

}  // namespace

class OperationTracker {
 public:
  OperationTracker() : background_thread_(Thread::Create()) {
    background_thread_->Start();
  }
  // The caller is responsible for ensuring that no operations are pending.
  ~OperationTracker() {}

  // Creates a binding for the synchronous operation (see
  // StartSynchronousOperation() below).
  std::function<void(std::function<void()>)> BindSynchronousOperation(
      Event* operation_complete_event) {
    return [this, operation_complete_event](std::function<void()> callback) {
      StartSynchronousOperation(operation_complete_event, std::move(callback));
    };
  }

  // Creates a binding for the asynchronous operation (see
  // StartAsynchronousOperation() below).
  std::function<void(std::function<void()>)> BindAsynchronousOperation(
      Event* unblock_operation_event,
      Event* operation_complete_event) {
    return [this, unblock_operation_event,
            operation_complete_event](std::function<void()> callback) {
      StartAsynchronousOperation(unblock_operation_event,
                                 operation_complete_event, std::move(callback));
    };
  }

  // When an operation is completed, its associated Event* is added to this
  // list, in chronological order. This allows you to verify the order that
  // operations are executed.
  const std::vector<Event*>& completed_operation_events() const {
    return completed_operation_events_;
  }

 private:
  // This operation is completed synchronously; the callback is invoked before
  // the function returns.
  void StartSynchronousOperation(Event* operation_complete_event,
                                 std::function<void()> callback) {
    completed_operation_events_.push_back(operation_complete_event);
    operation_complete_event->Set();
    callback();
  }

  // This operation is completed asynchronously; it pings |background_thread_|,
  // blocking that thread until |unblock_operation_event| is signaled and then
  // completes upon posting back to the thread that the operation started on.
  // Note that this requires the starting thread to be executing tasks (handle
  // messages), i.e. must not be blocked.
  void StartAsynchronousOperation(Event* unblock_operation_event,
                                  Event* operation_complete_event,
                                  std::function<void()> callback) {
    Thread* current_thread = Thread::Current();
    background_thread_->PostTask(
        RTC_FROM_HERE, [this, current_thread, unblock_operation_event,
                        operation_complete_event, callback]() {
          unblock_operation_event->Wait(Event::kForever);
          current_thread->PostTask(
              RTC_FROM_HERE, [this, operation_complete_event, callback]() {
                completed_operation_events_.push_back(operation_complete_event);
                operation_complete_event->Set();
                callback();
              });
        });
  }

  std::unique_ptr<Thread> background_thread_;
  std::vector<Event*> completed_operation_events_;
};

// The OperationTrackerProxy ensures all operations are chained on a separate
// thread. This allows tests to block while chained operations are posting
// between threads.
class OperationTrackerProxy {
 public:
  OperationTrackerProxy()
      : operations_chain_thread_(Thread::Create()),
        operation_tracker_(nullptr),
        operations_chain_(nullptr) {
    operations_chain_thread_->Start();
  }

  std::unique_ptr<Event> Initialize() {
    std::unique_ptr<Event> event = std::make_unique<Event>();
    operations_chain_thread_->PostTask(
        RTC_FROM_HERE, [this, event_ptr = event.get()]() {
          operation_tracker_ = std::make_unique<OperationTracker>();
          operations_chain_ = OperationsChain::Create();
          event_ptr->Set();
        });
    return event;
  }

  void SetOnChainEmptyCallback(std::function<void()> on_chain_empty_callback) {
    Event event;
    operations_chain_thread_->PostTask(
        RTC_FROM_HERE,
        [this, &event,
         on_chain_empty_callback = std::move(on_chain_empty_callback)]() {
          operations_chain_->SetOnChainEmptyCallback(
              std::move(on_chain_empty_callback));
          event.Set();
        });
    event.Wait(Event::kForever);
  }

  bool IsEmpty() {
    Event event;
    bool is_empty = false;
    operations_chain_thread_->PostTask(
        RTC_FROM_HERE, [this, &event, &is_empty]() {
          is_empty = operations_chain_->IsEmpty();
          event.Set();
        });
    event.Wait(Event::kForever);
    return is_empty;
  }

  std::unique_ptr<Event> ReleaseOperationChain() {
    std::unique_ptr<Event> event = std::make_unique<Event>();
    operations_chain_thread_->PostTask(RTC_FROM_HERE,
                                       [this, event_ptr = event.get()]() {
                                         operations_chain_ = nullptr;
                                         event_ptr->Set();
                                       });
    return event;
  }

  // Chains a synchronous operation on the operation chain's thread.
  std::unique_ptr<Event> PostSynchronousOperation() {
    std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
    operations_chain_thread_->PostTask(
        RTC_FROM_HERE, [this, operation_complete_event_ptr =
                                  operation_complete_event.get()]() {
          operations_chain_->ChainOperation(
              operation_tracker_->BindSynchronousOperation(
                  operation_complete_event_ptr));
        });
    return operation_complete_event;
  }

  // Chains an asynchronous operation on the operation chain's thread. This
  // involves the operation chain thread and an additional background thread.
  std::unique_ptr<Event> PostAsynchronousOperation(
      Event* unblock_operation_event) {
    std::unique_ptr<Event> operation_complete_event = std::make_unique<Event>();
    operations_chain_thread_->PostTask(
        RTC_FROM_HERE,
        [this, unblock_operation_event,
         operation_complete_event_ptr = operation_complete_event.get()]() {
          operations_chain_->ChainOperation(
              operation_tracker_->BindAsynchronousOperation(
                  unblock_operation_event, operation_complete_event_ptr));
        });
    return operation_complete_event;
  }

  // The order of completed events. Touches the |operation_tracker_| on the
  // calling thread, this is only thread safe if all chained operations have
  // completed.
  const std::vector<Event*>& completed_operation_events() const {
    return operation_tracker_->completed_operation_events();
  }

 private:
  std::unique_ptr<Thread> operations_chain_thread_;
  std::unique_ptr<OperationTracker> operation_tracker_;
  scoped_refptr<OperationsChain> operations_chain_;
};

// On destruction, sets a boolean flag to true.
class SignalOnDestruction final {
 public:
  SignalOnDestruction(bool* destructor_called)
      : destructor_called_(destructor_called) {
    RTC_DCHECK(destructor_called_);
  }
  ~SignalOnDestruction() {
    // Moved objects will have |destructor_called_| set to null. Destroying a
    // moved SignalOnDestruction should not signal.
    if (destructor_called_) {
      *destructor_called_ = true;
    }
  }

  // Move operators.
  SignalOnDestruction(SignalOnDestruction&& other)
      : SignalOnDestruction(other.destructor_called_) {
    other.destructor_called_ = nullptr;
  }
  SignalOnDestruction& operator=(SignalOnDestruction&& other) {
    destructor_called_ = other.destructor_called_;
    other.destructor_called_ = nullptr;
    return *this;
  }

 private:
  bool* destructor_called_;

  RTC_DISALLOW_COPY_AND_ASSIGN(SignalOnDestruction);
};

TEST(OperationsChainTest, SynchronousOperation) {
  OperationTrackerProxy operation_tracker_proxy;
  operation_tracker_proxy.Initialize()->Wait(Event::kForever);

  operation_tracker_proxy.PostSynchronousOperation()->Wait(Event::kForever);
}

TEST(OperationsChainTest, AsynchronousOperation) {
  OperationTrackerProxy operation_tracker_proxy;
  operation_tracker_proxy.Initialize()->Wait(Event::kForever);

  Event unblock_async_operation_event;
  auto async_operation_completed_event =
      operation_tracker_proxy.PostAsynchronousOperation(
          &unblock_async_operation_event);
  // This should not be signaled until we unblock the operation.
  EXPECT_FALSE(async_operation_completed_event->Wait(0));
  // Unblock the operation and wait for it to complete.
  unblock_async_operation_event.Set();
  async_operation_completed_event->Wait(Event::kForever);
}

TEST(OperationsChainTest,
     SynchronousOperationsAreExecutedImmediatelyWhenChainIsEmpty) {
  // Testing synchonicity must be done without the OperationTrackerProxy to
  // ensure messages are not processed in parallel. This test has no background
  // threads.
  scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
  OperationTracker operation_tracker;
  Event event0;
  operations_chain->ChainOperation(
      operation_tracker.BindSynchronousOperation(&event0));
  // This should already be signaled. (If it wasn't, waiting wouldn't help,
  // because we'd be blocking the only thread that exists.)
  EXPECT_TRUE(event0.Wait(0));
  // Chaining another operation should also execute immediately because the
  // chain should already be empty.
  Event event1;
  operations_chain->ChainOperation(
      operation_tracker.BindSynchronousOperation(&event1));
  EXPECT_TRUE(event1.Wait(0));
}

TEST(OperationsChainTest, AsynchronousOperationBlocksSynchronousOperation) {
  OperationTrackerProxy operation_tracker_proxy;
  operation_tracker_proxy.Initialize()->Wait(Event::kForever);

  Event unblock_async_operation_event;
  auto async_operation_completed_event =
      operation_tracker_proxy.PostAsynchronousOperation(
          &unblock_async_operation_event);

  auto sync_operation_completed_event =
      operation_tracker_proxy.PostSynchronousOperation();

  unblock_async_operation_event.Set();

  sync_operation_completed_event->Wait(Event::kForever);
  // The asynchronous avent should have blocked the synchronous event, meaning
  // this should already be signaled.
  EXPECT_TRUE(async_operation_completed_event->Wait(0));
}

TEST(OperationsChainTest, OperationsAreExecutedInOrder) {
  OperationTrackerProxy operation_tracker_proxy;
  operation_tracker_proxy.Initialize()->Wait(Event::kForever);

  // Chain a mix of asynchronous and synchronous operations.
  Event operation0_unblock_event;
  auto operation0_completed_event =
      operation_tracker_proxy.PostAsynchronousOperation(
          &operation0_unblock_event);

  Event operation1_unblock_event;
  auto operation1_completed_event =
      operation_tracker_proxy.PostAsynchronousOperation(
          &operation1_unblock_event);

  auto operation2_completed_event =
      operation_tracker_proxy.PostSynchronousOperation();

  auto operation3_completed_event =
      operation_tracker_proxy.PostSynchronousOperation();

  Event operation4_unblock_event;
  auto operation4_completed_event =
      operation_tracker_proxy.PostAsynchronousOperation(
          &operation4_unblock_event);

  auto operation5_completed_event =
      operation_tracker_proxy.PostSynchronousOperation();

  Event operation6_unblock_event;
  auto operation6_completed_event =
      operation_tracker_proxy.PostAsynchronousOperation(
          &operation6_unblock_event);

  // Unblock events in reverse order. Operations 5, 3 and 2 are synchronous and
  // don't need to be unblocked.
  operation6_unblock_event.Set();
  operation4_unblock_event.Set();
  operation1_unblock_event.Set();
  operation0_unblock_event.Set();
  // Await all operations. The await-order shouldn't matter since they all get
  // executed eventually.
  operation0_completed_event->Wait(Event::kForever);
  operation1_completed_event->Wait(Event::kForever);
  operation2_completed_event->Wait(Event::kForever);
  operation3_completed_event->Wait(Event::kForever);
  operation4_completed_event->Wait(Event::kForever);
  operation5_completed_event->Wait(Event::kForever);
  operation6_completed_event->Wait(Event::kForever);

  EXPECT_THAT(
      operation_tracker_proxy.completed_operation_events(),
      ElementsAre(
          operation0_completed_event.get(), operation1_completed_event.get(),
          operation2_completed_event.get(), operation3_completed_event.get(),
          operation4_completed_event.get(), operation5_completed_event.get(),
          operation6_completed_event.get()));
}

TEST(OperationsChainTest, IsEmpty) {
  OperationTrackerProxy operation_tracker_proxy;
  operation_tracker_proxy.Initialize()->Wait(Event::kForever);

  // The chain is initially empty.
  EXPECT_TRUE(operation_tracker_proxy.IsEmpty());
  // Chain a single event.
  Event unblock_async_operation_event0;
  auto async_operation_completed_event0 =
      operation_tracker_proxy.PostAsynchronousOperation(
          &unblock_async_operation_event0);
  // The chain is not empty while an event is pending.
  EXPECT_FALSE(operation_tracker_proxy.IsEmpty());
  // Completing the operation empties the chain.
  unblock_async_operation_event0.Set();
  async_operation_completed_event0->Wait(Event::kForever);
  EXPECT_TRUE(operation_tracker_proxy.IsEmpty());

  // Chain multiple events.
  Event unblock_async_operation_event1;
  auto async_operation_completed_event1 =
      operation_tracker_proxy.PostAsynchronousOperation(
          &unblock_async_operation_event1);
  Event unblock_async_operation_event2;
  auto async_operation_completed_event2 =
      operation_tracker_proxy.PostAsynchronousOperation(
          &unblock_async_operation_event2);
  // Again, the chain is not empty while an event is pending.
  EXPECT_FALSE(operation_tracker_proxy.IsEmpty());
  // Upon completing the first event, the chain is still not empty.
  unblock_async_operation_event1.Set();
  async_operation_completed_event1->Wait(Event::kForever);
  EXPECT_FALSE(operation_tracker_proxy.IsEmpty());
  // Completing the last evenet empties the chain.
  unblock_async_operation_event2.Set();
  async_operation_completed_event2->Wait(Event::kForever);
  EXPECT_TRUE(operation_tracker_proxy.IsEmpty());
}

TEST(OperationsChainTest, OnChainEmptyCallback) {
  OperationTrackerProxy operation_tracker_proxy;
  operation_tracker_proxy.Initialize()->Wait(Event::kForever);

  std::atomic<size_t> on_empty_callback_counter(0u);
  operation_tracker_proxy.SetOnChainEmptyCallback(
      [&on_empty_callback_counter] { ++on_empty_callback_counter; });

  // Chain a single event.
  Event unblock_async_operation_event0;
  auto async_operation_completed_event0 =
      operation_tracker_proxy.PostAsynchronousOperation(
          &unblock_async_operation_event0);
  // The callback is not invoked until the operation has completed.
  EXPECT_EQ(0u, on_empty_callback_counter);
  // Completing the operation empties the chain, invoking the callback.
  unblock_async_operation_event0.Set();
  async_operation_completed_event0->Wait(Event::kForever);
  EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout);

  // Chain multiple events.
  Event unblock_async_operation_event1;
  auto async_operation_completed_event1 =
      operation_tracker_proxy.PostAsynchronousOperation(
          &unblock_async_operation_event1);
  Event unblock_async_operation_event2;
  auto async_operation_completed_event2 =
      operation_tracker_proxy.PostAsynchronousOperation(
          &unblock_async_operation_event2);
  // Again, the callback is not invoked until the operation has completed.
  EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout);
  // Upon completing the first event, the chain is still not empty, so the
  // callback must not be invoked yet.
  unblock_async_operation_event1.Set();
  async_operation_completed_event1->Wait(Event::kForever);
  EXPECT_TRUE_WAIT(1u == on_empty_callback_counter, kDefaultTimeout);
  // Completing the last evenet empties the chain, invoking the callback.
  unblock_async_operation_event2.Set();
  async_operation_completed_event2->Wait(Event::kForever);
  EXPECT_TRUE_WAIT(2u == on_empty_callback_counter, kDefaultTimeout);
}

TEST(OperationsChainTest,
     SafeToReleaseReferenceToOperationChainWhileOperationIsPending) {
  OperationTrackerProxy operation_tracker_proxy;
  operation_tracker_proxy.Initialize()->Wait(Event::kForever);

  Event unblock_async_operation_event;
  auto async_operation_completed_event =
      operation_tracker_proxy.PostAsynchronousOperation(
          &unblock_async_operation_event);

  // Pending operations keep the OperationChain alive, making it safe for the
  // test to release any references before unblocking the async operation.
  operation_tracker_proxy.ReleaseOperationChain()->Wait(Event::kForever);

  unblock_async_operation_event.Set();
  async_operation_completed_event->Wait(Event::kForever);
}

TEST(OperationsChainTest, FunctorIsNotDestroyedWhileExecuting) {
  scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();

  bool destructor_called = false;
  SignalOnDestruction signal_on_destruction(&destructor_called);

  operations_chain->ChainOperation(
      [signal_on_destruction = std::move(signal_on_destruction),
       &destructor_called](std::function<void()> callback) {
        EXPECT_FALSE(destructor_called);
        // Invoking the callback marks the operation as complete, popping the
        // Operation object from the OperationsChain internal queue.
        callback();
        // Even though the internal Operation object has been destroyed,
        // variables captured by this lambda expression must still be valid (the
        // associated functor must not be deleted while executing).
        EXPECT_FALSE(destructor_called);
      });
  // The lambda having executed synchronously and completed, its captured
  // variables should now have been deleted.
  EXPECT_TRUE(destructor_called);
}

#if RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)

TEST(OperationsChainDeathTest, OperationNotInvokingCallbackShouldCrash) {
  scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
  EXPECT_DEATH(
      operations_chain->ChainOperation([](std::function<void()> callback) {}),
      "");
}

TEST(OperationsChainDeathTest,
     OperationInvokingCallbackMultipleTimesShouldCrash) {
  scoped_refptr<OperationsChain> operations_chain = OperationsChain::Create();
  EXPECT_DEATH(
      operations_chain->ChainOperation([](std::function<void()> callback) {
        // Signal that the operation has completed multiple times.
        callback();
        callback();
      }),
      "");
}

#endif  // RTC_DCHECK_IS_ON && GTEST_HAS_DEATH_TEST && !defined(WEBRTC_ANDROID)

}  // namespace rtc
