/*
 *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
 *
 *  Use of this source code is governed by a BSD-style license
 *  that can be found in the LICENSE file in the root of the source
 *  tree. An additional intellectual property rights grant can be found
 *  in the file PATENTS.  All contributing project authors may
 *  be found in the AUTHORS file in the root of the source tree.
 */

#include "webrtc/test/channel_transport/udp_socket2_manager_win.h"

#include <assert.h>
#include <stdio.h>

#include "webrtc/system_wrappers/include/aligned_malloc.h"
#include "webrtc/test/channel_transport/udp_socket2_win.h"

namespace webrtc {
namespace test {

uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0;
bool UdpSocket2ManagerWindows::_wsaInit = false;

UdpSocket2ManagerWindows::UdpSocket2ManagerWindows()
    : UdpSocketManager(),
      _id(-1),
      _stopped(false),
      _init(false),
      _pCrit(CriticalSectionWrapper::CreateCriticalSection()),
      _ioCompletionHandle(NULL),
      _numActiveSockets(0),
      _event(EventWrapper::Create())
{
    _managerNumber = _numOfActiveManagers++;

    if(_numOfActiveManagers == 1)
    {
        WORD wVersionRequested = MAKEWORD(2, 2);
        WSADATA wsaData;
        _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0;
        // TODO (hellner): seems safer to use RAII for this. E.g. what happens
        //                 if a UdpSocket2ManagerWindows() created and destroyed
        //                 without being initialized.
    }
}

UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows()
{
    WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
                 "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()",
                 _managerNumber);

    if(_init)
    {
        _pCrit->Enter();
        if(_numActiveSockets)
        {
            _pCrit->Leave();
            _event->Wait(INFINITE);
        }
        else
        {
            _pCrit->Leave();
        }
        StopWorkerThreads();

        for (WorkerList::iterator iter = _workerThreadsList.begin();
             iter != _workerThreadsList.end(); ++iter) {
          delete *iter;
        }
        _workerThreadsList.clear();
        _ioContextPool.Free();

        _numOfActiveManagers--;
        if(_ioCompletionHandle)
        {
            CloseHandle(_ioCompletionHandle);
        }
        if (_numOfActiveManagers == 0)
        {
            if(_wsaInit)
            {
                WSACleanup();
            }
        }
    }
    if(_pCrit)
    {
        delete _pCrit;
    }
    if(_event)
    {
        delete _event;
    }
}

bool UdpSocket2ManagerWindows::Init(int32_t id,
                                    uint8_t& numOfWorkThreads) {
  CriticalSectionScoped cs(_pCrit);
  if ((_id != -1) || (_numOfWorkThreads != 0)) {
      assert(_id != -1);
      assert(_numOfWorkThreads != 0);
      return false;
  }
  _id = id;
  _numOfWorkThreads = numOfWorkThreads;
  return true;
}

bool UdpSocket2ManagerWindows::Start()
{
    WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
                 "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber);
    if(!_init)
    {
        StartWorkerThreads();
    }

    if(!_init)
    {
        return false;
    }
    _pCrit->Enter();
    // Start worker threads.
    _stopped = false;
    int32_t error = 0;
    for (WorkerList::iterator iter = _workerThreadsList.begin();
         iter != _workerThreadsList.end() && !error; ++iter) {
      if(!(*iter)->Start())
        error = 1;
    }
    if(error)
    {
        WEBRTC_TRACE(
            kTraceError,
            kTraceTransport,
            _id,
            "UdpSocket2ManagerWindows(%d)::Start() error starting worker\
 threads",
            _managerNumber);
        _pCrit->Leave();
        return false;
    }
    _pCrit->Leave();
    return true;
}

bool UdpSocket2ManagerWindows::StartWorkerThreads()
{
    if(!_init)
    {
        _pCrit->Enter();

        _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL,
                                                     0, 0);
        if(_ioCompletionHandle == NULL)
        {
            int32_t error = GetLastError();
            WEBRTC_TRACE(
                kTraceError,
                kTraceTransport,
                _id,
                "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()"
                "_ioCompletioHandle == NULL: error:%d",
                _managerNumber,error);
            _pCrit->Leave();
            return false;
        }

        // Create worker threads.
        uint32_t i = 0;
        bool error = false;
        while(i < _numOfWorkThreads && !error)
        {
            UdpSocket2WorkerWindows* pWorker =
                new UdpSocket2WorkerWindows(_ioCompletionHandle);
            if(pWorker->Init() != 0)
            {
                error = true;
                delete pWorker;
                break;
            }
            _workerThreadsList.push_front(pWorker);
            i++;
        }
        if(error)
        {
            WEBRTC_TRACE(
                kTraceError,
                kTraceTransport,
                _id,
                "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
                "creating work threads",
                _managerNumber);
            // Delete worker threads.
            for (WorkerList::iterator iter = _workerThreadsList.begin();
                 iter != _workerThreadsList.end(); ++iter) {
              delete *iter;
            }
            _workerThreadsList.clear();
            _pCrit->Leave();
            return false;
        }
        if(_ioContextPool.Init())
        {
            WEBRTC_TRACE(
                kTraceError,
                kTraceTransport,
                _id,
                "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error "
                "initiating _ioContextPool",
                _managerNumber);
            _pCrit->Leave();
            return false;
        }
        _init = true;
        WEBRTC_TRACE(
            kTraceDebug,
            kTraceTransport,
            _id,
            "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work "
            "threads created and initialized",
            _numOfWorkThreads);
        _pCrit->Leave();
    }
    return true;
}

bool UdpSocket2ManagerWindows::Stop()
{
    WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
                 "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber);

    if(!_init)
    {
        return false;
    }
    _pCrit->Enter();
    _stopped = true;
    if(_numActiveSockets)
    {
        WEBRTC_TRACE(
            kTraceError,
            kTraceTransport,
            _id,
            "UdpSocket2ManagerWindows(%d)::Stop() there is still active\
 sockets",
            _managerNumber);
        _pCrit->Leave();
        return false;
    }
    // No active sockets. Stop all worker threads.
    bool result = StopWorkerThreads();
    _pCrit->Leave();
    return result;
}

bool UdpSocket2ManagerWindows::StopWorkerThreads()
{
    int32_t error = 0;
    WEBRTC_TRACE(
        kTraceDebug,
        kTraceTransport,
        _id,
        "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\
 threadsStoped, numActicve Sockets=%d",
        _managerNumber,
        _numActiveSockets);

    // Release all threads waiting for GetQueuedCompletionStatus(..).
    if(_ioCompletionHandle)
    {
        uint32_t i = 0;
        for(i = 0; i < _workerThreadsList.size(); i++)
        {
            PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL);
        }
    }
    for (WorkerList::iterator iter = _workerThreadsList.begin();
         iter != _workerThreadsList.end(); ++iter) {
        if((*iter)->Stop() == false)
        {
            error = -1;
            WEBRTC_TRACE(kTraceWarning,  kTraceTransport, -1,
                         "failed to stop worker thread");
        }
    }

    if(error)
    {
        WEBRTC_TRACE(
            kTraceError,
            kTraceTransport,
            _id,
            "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\
 worker threads",
            _managerNumber);
        return false;
    }
    return true;
}

bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s)
{
    WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id,
                 "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber);
    if(!_init)
    {
        WEBRTC_TRACE(
            kTraceError,
            kTraceTransport,
            _id,
            "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\
 initialized",
            _managerNumber);
        return false;
    }
    _pCrit->Enter();
    if(s == NULL)
    {
        WEBRTC_TRACE(
            kTraceError,
            kTraceTransport,
            _id,
            "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL",
            _managerNumber);
        _pCrit->Leave();
        return false;
    }
    if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET)
    {
        WEBRTC_TRACE(
            kTraceError,
            kTraceTransport,
            _id,
            "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\
 %d",
            _managerNumber,
            (int32_t)s->GetFd());
        _pCrit->Leave();
        return false;

    }
    _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(),
                                                 _ioCompletionHandle,
                                                 (ULONG_PTR)(s), 0);
    if(_ioCompletionHandle == NULL)
    {
        int32_t error = GetLastError();
        WEBRTC_TRACE(
            kTraceError,
            kTraceTransport,
            _id,
            "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\
 completion: %d",
            _managerNumber,
            error);
        _pCrit->Leave();
        return false;
    }
    _numActiveSockets++;
    _pCrit->Leave();
    return true;
}
bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s)
{
    if(!_init)
    {
        return false;
    }
    _pCrit->Enter();
    _numActiveSockets--;
    if(_numActiveSockets == 0)
    {
        _event->Set();
    }
    _pCrit->Leave();
    return true;
}

PerIoContext* UdpSocket2ManagerWindows::PopIoContext()
{
    if(!_init)
    {
        return NULL;
    }

    PerIoContext* pIoC = NULL;
    if(!_stopped)
    {
        pIoC = _ioContextPool.PopIoContext();
    }else
    {
        WEBRTC_TRACE(
            kTraceError,
            kTraceTransport,
            _id,
            "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started",
            _managerNumber);
    }
    return pIoC;
}

int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext)
{
    return _ioContextPool.PushIoContext(pIoContext);
}

IoContextPool::IoContextPool()
    : _pListHead(NULL),
      _init(false),
      _size(0),
      _inUse(0)
{
}

IoContextPool::~IoContextPool()
{
    Free();
    assert(_size.Value() == 0);
    AlignedFree(_pListHead);
}

int32_t IoContextPool::Init(uint32_t /*increaseSize*/)
{
    if(_init)
    {
        return 0;
    }

    _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER),
                                              MEMORY_ALLOCATION_ALIGNMENT);
    if(_pListHead == NULL)
    {
        return -1;
    }
    InitializeSListHead(_pListHead);
    _init = true;
    return 0;
}

PerIoContext* IoContextPool::PopIoContext()
{
    if(!_init)
    {
        return NULL;
    }

    PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
    if(pListEntry == NULL)
    {
        IoContextPoolItem* item = (IoContextPoolItem*)
            AlignedMalloc(
                sizeof(IoContextPoolItem),
                MEMORY_ALLOCATION_ALIGNMENT);
        if(item == NULL)
        {
            return NULL;
        }
        memset(&item->payload.ioContext,0,sizeof(PerIoContext));
        item->payload.base = item;
        pListEntry = &(item->itemEntry);
        ++_size;
    }
    ++_inUse;
    return &((IoContextPoolItem*)pListEntry)->payload.ioContext;
}

int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext)
{
    // TODO (hellner): Overlapped IO should be completed at this point. Perhaps
    //                 add an assert?
    const bool overlappedIOCompleted = HasOverlappedIoCompleted(
        (LPOVERLAPPED)pIoContext);

    IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base;

    const int32_t usedItems = --_inUse;
    const int32_t totalItems = _size.Value();
    const int32_t freeItems = totalItems - usedItems;
    if(freeItems < 0)
    {
        assert(false);
        AlignedFree(item);
        return -1;
    }
    if((freeItems >= totalItems>>1) &&
        overlappedIOCompleted)
    {
        AlignedFree(item);
        --_size;
        return 0;
    }
    InterlockedPushEntrySList(_pListHead, &(item->itemEntry));
    return 0;
}

int32_t IoContextPool::Free()
{
    if(!_init)
    {
        return 0;
    }

    int32_t itemsFreed = 0;
    PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead);
    while(pListEntry != NULL)
    {
        IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry);
        AlignedFree(item);
        --_size;
        itemsFreed++;
        pListEntry = InterlockedPopEntrySList(_pListHead);
    }
    return itemsFreed;
}

int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0;

UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle)
    : _ioCompletionHandle(ioCompletionHandle),
      _pThread(Run, this, "UdpSocket2ManagerWindows_thread"),
      _init(false) {
    _workerNumber = _numOfWorkers++;
    WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
                 "UdpSocket2WorkerWindows created");
}

UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows()
{
    WEBRTC_TRACE(kTraceMemory,  kTraceTransport, -1,
                 "UdpSocket2WorkerWindows deleted");
}

bool UdpSocket2WorkerWindows::Start()
{
    WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
                 "Start UdpSocket2WorkerWindows");
    _pThread.Start();

    _pThread.SetPriority(rtc::kRealtimePriority);
    return true;
}

bool UdpSocket2WorkerWindows::Stop()
{
    WEBRTC_TRACE(kTraceStateInfo,  kTraceTransport, -1,
                 "Stop UdpSocket2WorkerWindows");
    _pThread.Stop();
    return true;
}

int32_t UdpSocket2WorkerWindows::Init()
{
  _init = true;
  return 0;
}

bool UdpSocket2WorkerWindows::Run(void* obj)
{
    UdpSocket2WorkerWindows* pWorker =
        static_cast<UdpSocket2WorkerWindows*>(obj);
    return pWorker->Process();
}

// Process should always return true. Stopping the worker threads is done in
// the UdpSocket2ManagerWindows::StopWorkerThreads() function.
bool UdpSocket2WorkerWindows::Process()
{
    int32_t success = 0;
    DWORD ioSize = 0;
    UdpSocket2Windows* pSocket = NULL;
    PerIoContext* pIOContext = 0;
    OVERLAPPED* pOverlapped = 0;
    success = GetQueuedCompletionStatus(_ioCompletionHandle,
                                        &ioSize,
                                       (ULONG_PTR*)&pSocket, &pOverlapped, 200);

    uint32_t error = 0;
    if(!success)
    {
        error = GetLastError();
        if(error == WAIT_TIMEOUT)
        {
            return true;
        }
        // This may happen if e.g. PostQueuedCompletionStatus() has been called.
        // The IO context still needs to be reclaimed or re-used which is done
        // in UdpSocket2Windows::IOCompleted(..).
    }
    if(pSocket == NULL)
    {
        WEBRTC_TRACE(
            kTraceDebug,
            kTraceTransport,
            -1,
            "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread",
            _workerNumber);
        return true;
    }
    pIOContext = (PerIoContext*)pOverlapped;
    pSocket->IOCompleted(pIOContext,ioSize,error);
    return true;
}

}  // namespace test
}  // namespace webrtc
