| /* |
| * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. |
| * |
| * Use of this source code is governed by a BSD-style license |
| * that can be found in the LICENSE file in the root of the source |
| * tree. An additional intellectual property rights grant can be found |
| * in the file PATENTS. All contributing project authors may |
| * be found in the AUTHORS file in the root of the source tree. |
| */ |
| |
| #include "webrtc/test/channel_transport/udp_socket2_manager_win.h" |
| |
| #include <assert.h> |
| #include <stdio.h> |
| |
| #include "webrtc/system_wrappers/include/aligned_malloc.h" |
| #include "webrtc/test/channel_transport/udp_socket2_win.h" |
| |
| namespace webrtc { |
| namespace test { |
| |
| uint32_t UdpSocket2ManagerWindows::_numOfActiveManagers = 0; |
| bool UdpSocket2ManagerWindows::_wsaInit = false; |
| |
| UdpSocket2ManagerWindows::UdpSocket2ManagerWindows() |
| : UdpSocketManager(), |
| _id(-1), |
| _stopped(false), |
| _init(false), |
| _pCrit(CriticalSectionWrapper::CreateCriticalSection()), |
| _ioCompletionHandle(NULL), |
| _numActiveSockets(0), |
| _event(EventWrapper::Create()) |
| { |
| _managerNumber = _numOfActiveManagers++; |
| |
| if(_numOfActiveManagers == 1) |
| { |
| WORD wVersionRequested = MAKEWORD(2, 2); |
| WSADATA wsaData; |
| _wsaInit = WSAStartup(wVersionRequested, &wsaData) == 0; |
| // TODO (hellner): seems safer to use RAII for this. E.g. what happens |
| // if a UdpSocket2ManagerWindows() created and destroyed |
| // without being initialized. |
| } |
| } |
| |
| UdpSocket2ManagerWindows::~UdpSocket2ManagerWindows() |
| { |
| WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
| "UdpSocket2ManagerWindows(%d)::~UdpSocket2ManagerWindows()", |
| _managerNumber); |
| |
| if(_init) |
| { |
| _pCrit->Enter(); |
| if(_numActiveSockets) |
| { |
| _pCrit->Leave(); |
| _event->Wait(INFINITE); |
| } |
| else |
| { |
| _pCrit->Leave(); |
| } |
| StopWorkerThreads(); |
| |
| for (WorkerList::iterator iter = _workerThreadsList.begin(); |
| iter != _workerThreadsList.end(); ++iter) { |
| delete *iter; |
| } |
| _workerThreadsList.clear(); |
| _ioContextPool.Free(); |
| |
| _numOfActiveManagers--; |
| if(_ioCompletionHandle) |
| { |
| CloseHandle(_ioCompletionHandle); |
| } |
| if (_numOfActiveManagers == 0) |
| { |
| if(_wsaInit) |
| { |
| WSACleanup(); |
| } |
| } |
| } |
| if(_pCrit) |
| { |
| delete _pCrit; |
| } |
| if(_event) |
| { |
| delete _event; |
| } |
| } |
| |
| bool UdpSocket2ManagerWindows::Init(int32_t id, |
| uint8_t& numOfWorkThreads) { |
| CriticalSectionScoped cs(_pCrit); |
| if ((_id != -1) || (_numOfWorkThreads != 0)) { |
| assert(_id != -1); |
| assert(_numOfWorkThreads != 0); |
| return false; |
| } |
| _id = id; |
| _numOfWorkThreads = numOfWorkThreads; |
| return true; |
| } |
| |
| bool UdpSocket2ManagerWindows::Start() |
| { |
| WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
| "UdpSocket2ManagerWindows(%d)::Start()",_managerNumber); |
| if(!_init) |
| { |
| StartWorkerThreads(); |
| } |
| |
| if(!_init) |
| { |
| return false; |
| } |
| _pCrit->Enter(); |
| // Start worker threads. |
| _stopped = false; |
| int32_t error = 0; |
| for (WorkerList::iterator iter = _workerThreadsList.begin(); |
| iter != _workerThreadsList.end() && !error; ++iter) { |
| if(!(*iter)->Start()) |
| error = 1; |
| } |
| if(error) |
| { |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::Start() error starting worker\ |
| threads", |
| _managerNumber); |
| _pCrit->Leave(); |
| return false; |
| } |
| _pCrit->Leave(); |
| return true; |
| } |
| |
| bool UdpSocket2ManagerWindows::StartWorkerThreads() |
| { |
| if(!_init) |
| { |
| _pCrit->Enter(); |
| |
| _ioCompletionHandle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, |
| 0, 0); |
| if(_ioCompletionHandle == NULL) |
| { |
| int32_t error = GetLastError(); |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::StartWorkerThreads()" |
| "_ioCompletioHandle == NULL: error:%d", |
| _managerNumber,error); |
| _pCrit->Leave(); |
| return false; |
| } |
| |
| // Create worker threads. |
| uint32_t i = 0; |
| bool error = false; |
| while(i < _numOfWorkThreads && !error) |
| { |
| UdpSocket2WorkerWindows* pWorker = |
| new UdpSocket2WorkerWindows(_ioCompletionHandle); |
| if(pWorker->Init() != 0) |
| { |
| error = true; |
| delete pWorker; |
| break; |
| } |
| _workerThreadsList.push_front(pWorker); |
| i++; |
| } |
| if(error) |
| { |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error " |
| "creating work threads", |
| _managerNumber); |
| // Delete worker threads. |
| for (WorkerList::iterator iter = _workerThreadsList.begin(); |
| iter != _workerThreadsList.end(); ++iter) { |
| delete *iter; |
| } |
| _workerThreadsList.clear(); |
| _pCrit->Leave(); |
| return false; |
| } |
| if(_ioContextPool.Init()) |
| { |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::StartWorkerThreads() error " |
| "initiating _ioContextPool", |
| _managerNumber); |
| _pCrit->Leave(); |
| return false; |
| } |
| _init = true; |
| WEBRTC_TRACE( |
| kTraceDebug, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows::StartWorkerThreads %d number of work " |
| "threads created and initialized", |
| _numOfWorkThreads); |
| _pCrit->Leave(); |
| } |
| return true; |
| } |
| |
| bool UdpSocket2ManagerWindows::Stop() |
| { |
| WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
| "UdpSocket2ManagerWindows(%d)::Stop()",_managerNumber); |
| |
| if(!_init) |
| { |
| return false; |
| } |
| _pCrit->Enter(); |
| _stopped = true; |
| if(_numActiveSockets) |
| { |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::Stop() there is still active\ |
| sockets", |
| _managerNumber); |
| _pCrit->Leave(); |
| return false; |
| } |
| // No active sockets. Stop all worker threads. |
| bool result = StopWorkerThreads(); |
| _pCrit->Leave(); |
| return result; |
| } |
| |
| bool UdpSocket2ManagerWindows::StopWorkerThreads() |
| { |
| int32_t error = 0; |
| WEBRTC_TRACE( |
| kTraceDebug, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() Worker\ |
| threadsStoped, numActicve Sockets=%d", |
| _managerNumber, |
| _numActiveSockets); |
| |
| // Release all threads waiting for GetQueuedCompletionStatus(..). |
| if(_ioCompletionHandle) |
| { |
| uint32_t i = 0; |
| for(i = 0; i < _workerThreadsList.size(); i++) |
| { |
| PostQueuedCompletionStatus(_ioCompletionHandle, 0 ,0 , NULL); |
| } |
| } |
| for (WorkerList::iterator iter = _workerThreadsList.begin(); |
| iter != _workerThreadsList.end(); ++iter) { |
| if((*iter)->Stop() == false) |
| { |
| error = -1; |
| WEBRTC_TRACE(kTraceWarning, kTraceTransport, -1, |
| "failed to stop worker thread"); |
| } |
| } |
| |
| if(error) |
| { |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::StopWorkerThreads() error stopping\ |
| worker threads", |
| _managerNumber); |
| return false; |
| } |
| return true; |
| } |
| |
| bool UdpSocket2ManagerWindows::AddSocketPrv(UdpSocket2Windows* s) |
| { |
| WEBRTC_TRACE(kTraceDebug, kTraceTransport, _id, |
| "UdpSocket2ManagerWindows(%d)::AddSocketPrv()",_managerNumber); |
| if(!_init) |
| { |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::AddSocketPrv() manager not\ |
| initialized", |
| _managerNumber); |
| return false; |
| } |
| _pCrit->Enter(); |
| if(s == NULL) |
| { |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket == NULL", |
| _managerNumber); |
| _pCrit->Leave(); |
| return false; |
| } |
| if(s->GetFd() == NULL || s->GetFd() == INVALID_SOCKET) |
| { |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::AddSocketPrv() socket->GetFd() ==\ |
| %d", |
| _managerNumber, |
| (int32_t)s->GetFd()); |
| _pCrit->Leave(); |
| return false; |
| |
| } |
| _ioCompletionHandle = CreateIoCompletionPort((HANDLE)s->GetFd(), |
| _ioCompletionHandle, |
| (ULONG_PTR)(s), 0); |
| if(_ioCompletionHandle == NULL) |
| { |
| int32_t error = GetLastError(); |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::AddSocketPrv() Error adding to IO\ |
| completion: %d", |
| _managerNumber, |
| error); |
| _pCrit->Leave(); |
| return false; |
| } |
| _numActiveSockets++; |
| _pCrit->Leave(); |
| return true; |
| } |
| bool UdpSocket2ManagerWindows::RemoveSocketPrv(UdpSocket2Windows* s) |
| { |
| if(!_init) |
| { |
| return false; |
| } |
| _pCrit->Enter(); |
| _numActiveSockets--; |
| if(_numActiveSockets == 0) |
| { |
| _event->Set(); |
| } |
| _pCrit->Leave(); |
| return true; |
| } |
| |
| PerIoContext* UdpSocket2ManagerWindows::PopIoContext() |
| { |
| if(!_init) |
| { |
| return NULL; |
| } |
| |
| PerIoContext* pIoC = NULL; |
| if(!_stopped) |
| { |
| pIoC = _ioContextPool.PopIoContext(); |
| }else |
| { |
| WEBRTC_TRACE( |
| kTraceError, |
| kTraceTransport, |
| _id, |
| "UdpSocket2ManagerWindows(%d)::PopIoContext() Manager Not started", |
| _managerNumber); |
| } |
| return pIoC; |
| } |
| |
| int32_t UdpSocket2ManagerWindows::PushIoContext(PerIoContext* pIoContext) |
| { |
| return _ioContextPool.PushIoContext(pIoContext); |
| } |
| |
| IoContextPool::IoContextPool() |
| : _pListHead(NULL), |
| _init(false), |
| _size(0), |
| _inUse(0) |
| { |
| } |
| |
| IoContextPool::~IoContextPool() |
| { |
| Free(); |
| assert(_size.Value() == 0); |
| AlignedFree(_pListHead); |
| } |
| |
| int32_t IoContextPool::Init(uint32_t /*increaseSize*/) |
| { |
| if(_init) |
| { |
| return 0; |
| } |
| |
| _pListHead = (PSLIST_HEADER)AlignedMalloc(sizeof(SLIST_HEADER), |
| MEMORY_ALLOCATION_ALIGNMENT); |
| if(_pListHead == NULL) |
| { |
| return -1; |
| } |
| InitializeSListHead(_pListHead); |
| _init = true; |
| return 0; |
| } |
| |
| PerIoContext* IoContextPool::PopIoContext() |
| { |
| if(!_init) |
| { |
| return NULL; |
| } |
| |
| PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead); |
| if(pListEntry == NULL) |
| { |
| IoContextPoolItem* item = (IoContextPoolItem*) |
| AlignedMalloc( |
| sizeof(IoContextPoolItem), |
| MEMORY_ALLOCATION_ALIGNMENT); |
| if(item == NULL) |
| { |
| return NULL; |
| } |
| memset(&item->payload.ioContext,0,sizeof(PerIoContext)); |
| item->payload.base = item; |
| pListEntry = &(item->itemEntry); |
| ++_size; |
| } |
| ++_inUse; |
| return &((IoContextPoolItem*)pListEntry)->payload.ioContext; |
| } |
| |
| int32_t IoContextPool::PushIoContext(PerIoContext* pIoContext) |
| { |
| // TODO (hellner): Overlapped IO should be completed at this point. Perhaps |
| // add an assert? |
| const bool overlappedIOCompleted = HasOverlappedIoCompleted( |
| (LPOVERLAPPED)pIoContext); |
| |
| IoContextPoolItem* item = ((IoContextPoolItemPayload*)pIoContext)->base; |
| |
| const int32_t usedItems = --_inUse; |
| const int32_t totalItems = _size.Value(); |
| const int32_t freeItems = totalItems - usedItems; |
| if(freeItems < 0) |
| { |
| assert(false); |
| AlignedFree(item); |
| return -1; |
| } |
| if((freeItems >= totalItems>>1) && |
| overlappedIOCompleted) |
| { |
| AlignedFree(item); |
| --_size; |
| return 0; |
| } |
| InterlockedPushEntrySList(_pListHead, &(item->itemEntry)); |
| return 0; |
| } |
| |
| int32_t IoContextPool::Free() |
| { |
| if(!_init) |
| { |
| return 0; |
| } |
| |
| int32_t itemsFreed = 0; |
| PSLIST_ENTRY pListEntry = InterlockedPopEntrySList(_pListHead); |
| while(pListEntry != NULL) |
| { |
| IoContextPoolItem* item = ((IoContextPoolItem*)pListEntry); |
| AlignedFree(item); |
| --_size; |
| itemsFreed++; |
| pListEntry = InterlockedPopEntrySList(_pListHead); |
| } |
| return itemsFreed; |
| } |
| |
| int32_t UdpSocket2WorkerWindows::_numOfWorkers = 0; |
| |
| UdpSocket2WorkerWindows::UdpSocket2WorkerWindows(HANDLE ioCompletionHandle) |
| : _ioCompletionHandle(ioCompletionHandle), |
| _pThread(Run, this, "UdpSocket2ManagerWindows_thread"), |
| _init(false) { |
| _workerNumber = _numOfWorkers++; |
| WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, |
| "UdpSocket2WorkerWindows created"); |
| } |
| |
| UdpSocket2WorkerWindows::~UdpSocket2WorkerWindows() |
| { |
| WEBRTC_TRACE(kTraceMemory, kTraceTransport, -1, |
| "UdpSocket2WorkerWindows deleted"); |
| } |
| |
| bool UdpSocket2WorkerWindows::Start() |
| { |
| WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, |
| "Start UdpSocket2WorkerWindows"); |
| _pThread.Start(); |
| |
| _pThread.SetPriority(rtc::kRealtimePriority); |
| return true; |
| } |
| |
| bool UdpSocket2WorkerWindows::Stop() |
| { |
| WEBRTC_TRACE(kTraceStateInfo, kTraceTransport, -1, |
| "Stop UdpSocket2WorkerWindows"); |
| _pThread.Stop(); |
| return true; |
| } |
| |
| int32_t UdpSocket2WorkerWindows::Init() |
| { |
| _init = true; |
| return 0; |
| } |
| |
| bool UdpSocket2WorkerWindows::Run(void* obj) |
| { |
| UdpSocket2WorkerWindows* pWorker = |
| static_cast<UdpSocket2WorkerWindows*>(obj); |
| return pWorker->Process(); |
| } |
| |
| // Process should always return true. Stopping the worker threads is done in |
| // the UdpSocket2ManagerWindows::StopWorkerThreads() function. |
| bool UdpSocket2WorkerWindows::Process() |
| { |
| int32_t success = 0; |
| DWORD ioSize = 0; |
| UdpSocket2Windows* pSocket = NULL; |
| PerIoContext* pIOContext = 0; |
| OVERLAPPED* pOverlapped = 0; |
| success = GetQueuedCompletionStatus(_ioCompletionHandle, |
| &ioSize, |
| (ULONG_PTR*)&pSocket, &pOverlapped, 200); |
| |
| uint32_t error = 0; |
| if(!success) |
| { |
| error = GetLastError(); |
| if(error == WAIT_TIMEOUT) |
| { |
| return true; |
| } |
| // This may happen if e.g. PostQueuedCompletionStatus() has been called. |
| // The IO context still needs to be reclaimed or re-used which is done |
| // in UdpSocket2Windows::IOCompleted(..). |
| } |
| if(pSocket == NULL) |
| { |
| WEBRTC_TRACE( |
| kTraceDebug, |
| kTraceTransport, |
| -1, |
| "UdpSocket2WorkerWindows(%d)::Process(), pSocket == 0, end thread", |
| _workerNumber); |
| return true; |
| } |
| pIOContext = (PerIoContext*)pOverlapped; |
| pSocket->IOCompleted(pIOContext,ioSize,error); |
| return true; |
| } |
| |
| } // namespace test |
| } // namespace webrtc |