#include "stdafx.h" #include "SocketMgr.h" #include bool SocketMgr::s_bRunningCleanupThread = true; std::recursive_mutex SocketMgr::s_disconnectionQueueLock; std::queue SocketMgr::s_disconnectionQueue; Thread SocketMgr::s_cleanupThread; Atomic SocketMgr::s_refCounter; uint32 THREADCALL SocketCleanupThread(void * lpParam) { while (SocketMgr::s_bRunningCleanupThread) { SocketMgr::s_disconnectionQueueLock.lock(); while (!SocketMgr::s_disconnectionQueue.empty()) { Socket *pSock = SocketMgr::s_disconnectionQueue.front(); if (pSock->GetSocketMgr()) pSock->GetSocketMgr()->DisconnectCallback(pSock); SocketMgr::s_disconnectionQueue.pop(); } SocketMgr::s_disconnectionQueueLock.unlock(); sleep(100); } return 0; } SocketMgr::SocketMgr() : m_threadCount(0), m_bWorkerThreadsActive(false), m_bShutdown(false) { static bool bRefCounterInitialised = false; if (!bRefCounterInitialised) { s_refCounter = 0; bRefCounterInitialised = true; } IncRef(); Initialise(); } void SocketMgr::SpawnWorkerThreads() { if (m_bWorkerThreadsActive) return; #ifdef _WIN32 SYSTEM_INFO si; GetSystemInfo(&si); int threadcount = 1;//si.dwNumberOfProcessors * 2; #else // Linux: get number of processors int threadcount = 1; // For now, use single thread #endif m_bWorkerThreadsActive = true; for (int i = 0; i < threadcount; i++) m_threads.push_back(new Thread(SocketWorkerThread, this)); if (!s_cleanupThread.isStarted()) s_cleanupThread.start(SocketCleanupThread); } uint32 THREADCALL SocketMgr::SocketWorkerThread(void * lpParam) { SocketMgr *socketMgr = (SocketMgr *)lpParam; #ifdef _WIN32 HANDLE cp = socketMgr->GetCompletionPort(); DWORD len; Socket * s = nullptr; OverlappedStruct * ov = nullptr; LPOVERLAPPED ol_ptr; while (socketMgr->m_bWorkerThreadsActive) { #ifndef _WIN64 if(!GetQueuedCompletionStatus(cp, &len, (LPDWORD)&s, &ol_ptr, 10000)) #else if(!GetQueuedCompletionStatus(cp, &len, (PULONG_PTR)&s, &ol_ptr, 10000)) #endif continue; ov = CONTAINING_RECORD(ol_ptr, OverlappedStruct, m_overlap); if (ov->m_event == SOCKET_IO_THREAD_SHUTDOWN) { delete ov; return 0; } if (ov->m_event < NUM_SOCKET_IO_EVENTS) ophandlers[ov->m_event](s, len); //std::async(std::launch::async, ophandlers[ov->m_event], s, len); } #else // Linux implementation - simple event loop while (socketMgr->m_bWorkerThreadsActive) { // For Linux, we use epoll in ListenSocket, so this thread can just sleep usleep(100000); // 100ms } #endif return 0; } void SocketMgr::Initialise() { #ifdef _WIN32 m_completionPort = nullptr; #else m_completionPort = -1; #endif } void SocketMgr::CreateCompletionPort() { #ifdef _WIN32 SetCompletionPort(CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, (ULONG_PTR)0, 0)); #else // Linux: create epoll instance SetCompletionPort(epoll_create1(0)); #endif } void SocketMgr::SetupWinsock() { #ifdef _WIN32 WSADATA wsaData; WSAStartup(MAKEWORD(2,0), &wsaData); #else // Linux: no socket initialization needed #endif } void HandleReadComplete(Socket * s, uint32 len) { if (s->IsDeleted()) return; s->m_readEvent.Unmark(); if (len) { s->GetReadBuffer().IncrementWritten(len); s->OnRead(); s->SetupReadEvent(); } else { s->Disconnect(); } } void HandleWriteComplete(Socket * s, uint32 len) { if (s->IsDeleted()) return; s->m_writeEvent.Unmark(); Guard lock(s->m_writeMutex);// Lock s->GetWriteBuffer().Remove(len); if(s->GetWriteBuffer().GetContiguousBytes() > 0) s->WriteCallback(); else s->DecSendLock(); } void HandleShutdown(Socket * s, uint32 len) {} void SocketMgr::OnConnect(Socket *pSock) {} void SocketMgr::DisconnectCallback(Socket *pSock) {} void SocketMgr::OnDisconnect(Socket *pSock) { Guard lock(s_disconnectionQueueLock); s_disconnectionQueue.push(pSock); } void SocketMgr::ShutdownThreads() { #ifdef _WIN32 OverlappedStruct * ov = new OverlappedStruct(SOCKET_IO_THREAD_SHUTDOWN); PostQueuedCompletionStatus(m_completionPort, 0, (ULONG_PTR)0, &ov->m_overlap); #endif m_bWorkerThreadsActive = false; printf("Waiting for worker socket threads to exit..."); foreach (itr, m_threads) { (*itr)->waitForExit(); delete (*itr); } printf(" exited.\n"); } void SocketMgr::Shutdown() { if (m_bShutdown) return; ShutdownThreads(); DecRef(); m_bShutdown = true; } void SocketMgr::SetupSockets() { SetupWinsock(); } void SocketMgr::CleanupSockets() { if (s_cleanupThread.isStarted()) { s_bRunningCleanupThread = false; s_cleanupThread.waitForExit(); } #ifdef _WIN32 WSACleanup(); #else // Linux: no socket cleanup needed #endif } SocketMgr::~SocketMgr() { Shutdown(); }