개발관련/C&C++

IOCP Socket Server 구현

Diademata 2018. 4. 16. 22:47
반응형

code : https://github.com/EomTaeWook/Cpp-Util/tree/master/Util/Socket


IOCPServerSocket.h


#pragma once

#include "NS.h"

#include "IOCPBaseServer.h"


NS_SOCKET_BEGIN

template<typename ProtocolType, typename ...Types>

class IOCPServerSocket : public IOCPBaseServer

{

protected:

IOCPServerSocket();

public:

virtual ~IOCPServerSocket();

protected:

std::map<ProtocolType, Util::Common::MulticastDelegate<void, Util::Socket::StateObject&, Types...>> _funcMaps;

public:

void BindCallback(ProtocolType protocol, std::function<void(Util::Socket::StateObject&, Types...)> callback);

void OnCallback(ProtocolType protocol, Util::Socket::StateObject& stateObject, Types...);

};

template<typename ProtocolType, typename ...Types>

IOCPServerSocket<ProtocolType, Types...>::IOCPServerSocket()

{

}

template<typename ProtocolType, typename ...Types>

IOCPServerSocket<ProtocolType, Types...>::~IOCPServerSocket()

{

}

template<typename ProtocolType, typename ...Types>

inline void IOCPServerSocket<ProtocolType, Types...>::BindCallback(ProtocolType protocol, std::function<void(Util::Socket::StateObject&, Types...)> callback)

{

if (_funcMaps.find(protocol) == _funcMaps.end())

_funcMaps.insert(std::pair<ProtocolType, Util::Common::MulticastDelegate<void, Util::Socket::StateObject&, Types...>>(protocol, std::move(callback)));

else

throw std::exception("An item with the same key has already been Added");

}

template<typename ProtocolType, typename ...Types>

inline void IOCPServerSocket<ProtocolType, Types...>::OnCallback(ProtocolType protocol, Util::Socket::StateObject& stateObject, Types... params)

{

try

{

auto it = _funcMaps.find(protocol);

if (it != _funcMaps.end())

{

it->second(std::forward<Util::Socket::StateObject&>(stateObject), std::forward<Types>(params)...);

}

}

catch (std::exception ex)

{

ex.what();

}

catch (...)

{

}

}

NS_SOCKET_END


IOCPBaseServer.h


#pragma once

#include "NS.h"

#include <WinSock2.h>

#include "StateObject.h"

#include <WS2tcpip.h>

#include <string>

#include <vector>

#include <memory>

#include "../Threading/Thread.h"

#include "SyncCount.h"

#include "IPacket.h"

#include <map>

#pragma comment(lib, "Ws2_32.lib")


NS_SOCKET_BEGIN

class IOCPBaseServer

{

private:

static const LONGLONG _CLOSE_THREAD = -1;

protected:

IOCPBaseServer();

public:

virtual ~IOCPBaseServer();

private:

std::unique_ptr<Util::Threading::Thread> _thread;

std::map<unsigned long, std::shared_ptr<StateObject>> _clients;

bool _isStart;

HANDLE _completionPort;

std::vector<HANDLE> _hWorkerThread;

sockaddr_in _iPEndPoint;

SyncCount _handleCount;

private:

SOCKET _listener;

Util::Threading::CriticalSection _remove;

Util::Threading::CriticalSection _read;

public:

void Start(std::string ip, int port);

void Stop();

private:

void BeginReceive(StateObject* pStateObject);

int Invoke();

void StartListening(void* pObj = nullptr);

void AddPeer(StateObject* pStateObject);

void ClosePeer(StateObject* pStateObject);

public:

void Init(UINT threadSize = 0);

protected:

//abstract Method

virtual void OnAccepted(Util::Socket::StateObject& stateObject) = 0;

virtual void OnDisconnected(unsigned long handle) = 0;

virtual void OnRecieved(Util::Socket::StateObject& stateObject) = 0;

//virtual

virtual void BroadCast(Util::Socket::IPacket& packet, StateObject state);

private:

static unsigned int __stdcall Run(void*);

};

inline IOCPBaseServer::IOCPBaseServer()

{

_completionPort = NULL;

_isStart = false;

}

inline IOCPBaseServer::~IOCPBaseServer()

{

Stop();

}

NS_SOCKET_END


IOCPBaseServer.cpp


#include "IOCPBaseServer.h"

#include "..\Common\Finally.h"

#include <iostream>


NS_SOCKET_BEGIN

void IOCPBaseServer::Stop()

{

_isStart = false;


for (auto it = _clients.begin(); it != _clients.end(); ++it)

{

it->second->Close();

it->second.reset();

}

_clients.clear();


for (size_t i = 0; i < _hWorkerThread.size(); i++)

PostQueuedCompletionStatus(_completionPort, 0, _CLOSE_THREAD, NULL);

for (size_t i = 0; i < _hWorkerThread.size(); i++)

{

WaitForSingleObject(_hWorkerThread[i], INFINITE);

CloseHandle(_hWorkerThread[i]);

}

closesocket(_listener);

WSACleanup();

_listener = NULL;

CloseHandle(_completionPort);

_completionPort = NULL;

_hWorkerThread.clear();

}

void IOCPBaseServer::Init(UINT threadSize)

{

_completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

if (_completionPort == INVALID_HANDLE_VALUE)

throw std::exception("CreateIoCompletionPort Fail");


SYSTEM_INFO info;

GetSystemInfo(&info);

if (threadSize > 0)

{

if (info.dwNumberOfProcessors * 2 < threadSize)

threadSize = info.dwNumberOfProcessors * 2;

else

threadSize = info.dwNumberOfProcessors * 2;

}

else

threadSize = info.dwNumberOfProcessors * 2;

for (size_t i = 0; i < threadSize; i++)

_hWorkerThread.push_back((HANDLE)_beginthreadex(0, 0, Run, this, 0, NULL));

}

void IOCPBaseServer::Start(std::string ip, int port)

{

if (_isStart) return;

try

{

if (_completionPort == NULL)

Init();

WSADATA _wsaData;

if (WSAStartup(MAKEWORD(2, 2), &_wsaData) != 0)

throw std::exception();

if (_thread.get() == NULL)

{

_isStart = true;

memset(&_iPEndPoint, 0, sizeof(_iPEndPoint));

if (inet_pton(PF_INET, ip.c_str(), &_iPEndPoint) != 1)

{

throw std::exception();

}

_iPEndPoint.sin_family = PF_INET;

_iPEndPoint.sin_port = htons(port);

_listener = WSASocketW(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

if (_listener == INVALID_SOCKET)

{

auto error = GetLastError();

Stop();

throw std::exception("Listener Create Error : " + error);

}


//accept시 NonBlock

//unsigned long mode = 1;

//ioctlsocket(_listener, FIONBIO, &mode);

int option = 1;

setsockopt(_listener, SOL_SOCKET, SO_REUSEADDR, (char*)&option, sizeof(option));

if (::bind(_listener, (SOCKADDR*)&_iPEndPoint, sizeof(_iPEndPoint)) == SOCKET_ERROR)

{

auto error = GetLastError();

Stop();

throw std::exception("BindException : " + error);

}

if (listen(_listener, 100) == SOCKET_ERROR)

{

auto error = GetLastError();

Stop();

throw std::exception("ListenExcption : " + error);

}

_thread = std::make_unique<Threading::Thread>(std::bind(&IOCPBaseServer::StartListening, this, nullptr));

_thread->Start();

}

}

catch (...)

{

_isStart = false;

throw std::exception("Server Start Fail");

}

}


void IOCPBaseServer::StartListening(void* pObj)

{

while (_isStart)

{

SOCKADDR_IN clientAddr;

int size = sizeof(clientAddr);

memset(&clientAddr, 0, size);

SOCKET handler = accept(_listener, (SOCKADDR*)&clientAddr, &size);

if (handler == INVALID_SOCKET)

continue;

auto stateObject = new StateObject();

stateObject->Socket() = handler;

std::memcpy(&stateObject->SocketAddr(), &clientAddr, size);

stateObject->Handle() = _handleCount.Add();

AddPeer(stateObject);

OnAccepted(*stateObject);

CreateIoCompletionPort((HANDLE)stateObject->Socket(), _completionPort, (ULONG_PTR)stateObject, 0);

BeginReceive(stateObject);

}

}

void IOCPBaseServer::BeginReceive(Socket::StateObject* pStateObject)

{

DWORD flags = 0;

if (WSARecv(pStateObject->Socket(), &pStateObject->WSABuff(), 1, 0, &flags, (LPWSAOVERLAPPED)(&pStateObject->ReceiveOverlapped()), NULL) == SOCKET_ERROR)

{

int error = WSAGetLastError();

if (error != WSA_IO_PENDING)

ClosePeer(pStateObject);

}

}

int IOCPBaseServer::Invoke()

{

unsigned long bytesTrans = 0;

ULONG_PTR stateObject = 0;

Socket::Overlapped* overlapped;

while (true)

{

if (!GetQueuedCompletionStatus(_completionPort, &bytesTrans, &stateObject, (LPOVERLAPPED *)&overlapped, INFINITE))

{

int error = WSAGetLastError();

if (error != ERROR_NETNAME_DELETED)

{

printf("Error : %d", error);

break;

}

}

if ((LONG_PTR)stateObject == _CLOSE_THREAD && bytesTrans == 0)

break;

auto pHandler = reinterpret_cast<StateObject*>(stateObject);

if (bytesTrans == 0)

{

ClosePeer(pHandler);

continue;

}

if (overlapped->mode == Util::Socket::Mode::Receive)

{

pHandler->ReceiveBuffer().Append(pHandler->WSABuff().buf, bytesTrans);

try

{

OnRecieved(*pHandler);

}

catch (std::exception ex)

{

printf("%s", ex.what());

}

BeginReceive(pHandler);

}

}

return 0;

}

void IOCPBaseServer::AddPeer(StateObject* pStateObject)

{

auto finally = Common::Finally(std::bind(&Threading::CriticalSection::LeaveCriticalSection, &_read));

try

{

_read.EnterCriticalSection();

auto it = _clients.find(pStateObject->Handle());

if (it != _clients.end())

{

it->second.get()->Close();

it->second.reset();

_clients.erase(pStateObject->Handle());

}

auto _pStateObject = std::make_shared<StateObject>(*pStateObject);

_clients.insert(std::make_pair(_pStateObject->Handle(), std::move(_pStateObject)));

}

catch (...)

{

}

}

void IOCPBaseServer::ClosePeer(StateObject* pStateObject)

{

auto finally = Common::Finally(std::bind(&Threading::CriticalSection::LeaveCriticalSection, &_remove));

try

{

_remove.EnterCriticalSection();

auto handle = pStateObject->Handle();

auto it = _clients.find(handle);

if (it != _clients.end())

{

it->second.get()->Close();

it->second.reset();

_clients.erase(it);

OnDisconnected(handle);

}

else

{

if (pStateObject->Socket() != NULL)

{

pStateObject->Close();

delete pStateObject;

pStateObject = nullptr;

}

}

}

catch (...)

{

}

}

void IOCPBaseServer::BroadCast(Util::Socket::IPacket& packet, StateObject state)

{

}

unsigned int __stdcall IOCPBaseServer::Run(void* obj)

{

auto server = reinterpret_cast<IOCPBaseServer*>(obj);

return server->Invoke();

}

NS_SOCKET_END


StateObject.h


#pragma once

#include "NS.h"

#include <WinSock2.h>

#include <memory>

#include "DefaultPacket.h"

#include "../Collections/SyncQueue.h"

#include "Overlapped.h"


NS_SOCKET_BEGIN

class StateObject

{

public:

StateObject();

virtual ~StateObject();

private:

static const int BUFF_SIZE = 4096;

private:

SOCKET _sock;

SOCKADDR_IN _addr;

Overlapped _receiveOverlapped;

Overlapped _sendOverlapped;

WSABUF _wsaBuf;

char _buffer[BUFF_SIZE];

unsigned long _handle;

Util::Collections::SyncQueue<char> _receiveBuffer;

Util::Collections::SyncQueue<Util::Socket::IPacket*> _receivePacketBuffer;

public:

SOCKET & Socket();

SOCKADDR_IN& SocketAddr();

Overlapped& ReceiveOverlapped();

WSABUF& WSABuff();

unsigned long& Handle();

Util::Collections::SyncQueue<char>& ReceiveBuffer();

Util::Collections::SyncQueue<Util::Socket::IPacket*>& ReceivePacketBuffer();

public:

void Close();

void Send(Util::Socket::IPacket& packet);

};


inline StateObject::StateObject()

{

memset(&_receiveOverlapped, 0, sizeof(Overlapped));


memset(&_sendOverlapped, 0, sizeof(Overlapped));

_sendOverlapped.mode = Socket::Mode::Send;


_wsaBuf.len = BUFF_SIZE;

_wsaBuf.buf = _buffer;

_sock = NULL;

}

inline StateObject::~StateObject()

{

Close();

}


inline Util::Collections::SyncQueue<Util::Socket::IPacket*>& StateObject::ReceivePacketBuffer()

{

return _receivePacketBuffer;

}

inline Util::Collections::SyncQueue<char>& StateObject::ReceiveBuffer()

{

return _receiveBuffer;

}

inline void StateObject::Send(Util::Socket::IPacket& packet)

{

WSABUF wsaBuf;

ULONG size = 0;

char* buffer = nullptr;

packet.GetBytes(&buffer, &size);

if (buffer != nullptr)

{

wsaBuf.buf = buffer;

wsaBuf.len = size;

if (WSASend(_sock, &wsaBuf, 1, NULL, 0, (LPWSAOVERLAPPED)&_sendOverlapped, NULL) == SOCKET_ERROR)

{

if (WSAGetLastError() != WSA_IO_PENDING)

Close();

}

}

}

inline void StateObject::Close()

{

_receiveBuffer.Clear();

_receivePacketBuffer.Clear();

closesocket(_sock);

_sock = NULL;

}

inline SOCKET& StateObject::Socket()

{

return _sock;

}

inline SOCKADDR_IN& StateObject::SocketAddr()

{

return _addr;

}


inline Overlapped& StateObject::ReceiveOverlapped()

{

return _receiveOverlapped;

}


inline WSABUF& StateObject::WSABuff()

{

return _wsaBuf;

}

inline unsigned long& StateObject::Handle()

{

return _handle;

}

NS_SOCKET_END

반응형

'개발관련 > C&C++' 카테고리의 다른 글

MSMQ(MS MessageQueue)  (0) 2018.05.15
Functional 이용한 델리게이트  (0) 2018.04.26
StringFormat  (0) 2018.03.20
라이브러리 빌드 전/후 이벤트 명령어  (0) 2018.03.16
C#처럼 이벤트 처리  (0) 2018.02.27