code : https://github.com/EomTaeWook/Cpp-Util/tree/master/Util/Socket
IOCPServerSocket.h
#pragma once
#include "NS.h"
#include "IOCPBaseServer.h"
NS_SOCKET_BEGIN
template<typename ProtocolType, typename ...Types>
class IOCPServerSocket : public IOCPBaseServer
{
protected:
IOCPServerSocket();
public:
virtual ~IOCPServerSocket();
protected:
std::map<ProtocolType, Util::Common::MulticastDelegate<void, Util::Socket::StateObject&, Types...>> _funcMaps;
public:
void BindCallback(ProtocolType protocol, std::function<void(Util::Socket::StateObject&, Types...)> callback);
void OnCallback(ProtocolType protocol, Util::Socket::StateObject& stateObject, Types...);
};
template<typename ProtocolType, typename ...Types>
IOCPServerSocket<ProtocolType, Types...>::IOCPServerSocket()
{
}
template<typename ProtocolType, typename ...Types>
IOCPServerSocket<ProtocolType, Types...>::~IOCPServerSocket()
{
}
template<typename ProtocolType, typename ...Types>
inline void IOCPServerSocket<ProtocolType, Types...>::BindCallback(ProtocolType protocol, std::function<void(Util::Socket::StateObject&, Types...)> callback)
{
if (_funcMaps.find(protocol) == _funcMaps.end())
_funcMaps.insert(std::pair<ProtocolType, Util::Common::MulticastDelegate<void, Util::Socket::StateObject&, Types...>>(protocol, std::move(callback)));
else
throw std::exception("An item with the same key has already been Added");
}
template<typename ProtocolType, typename ...Types>
inline void IOCPServerSocket<ProtocolType, Types...>::OnCallback(ProtocolType protocol, Util::Socket::StateObject& stateObject, Types... params)
{
try
{
auto it = _funcMaps.find(protocol);
if (it != _funcMaps.end())
{
it->second(std::forward<Util::Socket::StateObject&>(stateObject), std::forward<Types>(params)...);
}
}
catch (std::exception ex)
{
ex.what();
}
catch (...)
{
}
}
NS_SOCKET_END
IOCPBaseServer.h
#pragma once
#include "NS.h"
#include <WinSock2.h>
#include "StateObject.h"
#include <WS2tcpip.h>
#include <string>
#include <vector>
#include <memory>
#include "../Threading/Thread.h"
#include "SyncCount.h"
#include "IPacket.h"
#include <map>
#pragma comment(lib, "Ws2_32.lib")
NS_SOCKET_BEGIN
class IOCPBaseServer
{
private:
static const LONGLONG _CLOSE_THREAD = -1;
protected:
IOCPBaseServer();
public:
virtual ~IOCPBaseServer();
private:
std::unique_ptr<Util::Threading::Thread> _thread;
std::map<unsigned long, std::shared_ptr<StateObject>> _clients;
bool _isStart;
HANDLE _completionPort;
std::vector<HANDLE> _hWorkerThread;
sockaddr_in _iPEndPoint;
SyncCount _handleCount;
private:
SOCKET _listener;
Util::Threading::CriticalSection _remove;
Util::Threading::CriticalSection _read;
public:
void Start(std::string ip, int port);
void Stop();
private:
void BeginReceive(StateObject* pStateObject);
int Invoke();
void StartListening(void* pObj = nullptr);
void AddPeer(StateObject* pStateObject);
void ClosePeer(StateObject* pStateObject);
public:
void Init(UINT threadSize = 0);
protected:
//abstract Method
virtual void OnAccepted(Util::Socket::StateObject& stateObject) = 0;
virtual void OnDisconnected(unsigned long handle) = 0;
virtual void OnRecieved(Util::Socket::StateObject& stateObject) = 0;
//virtual
virtual void BroadCast(Util::Socket::IPacket& packet, StateObject state);
private:
static unsigned int __stdcall Run(void*);
};
inline IOCPBaseServer::IOCPBaseServer()
{
_completionPort = NULL;
_isStart = false;
}
inline IOCPBaseServer::~IOCPBaseServer()
{
Stop();
}
NS_SOCKET_END
IOCPBaseServer.cpp
#include "IOCPBaseServer.h"
#include "..\Common\Finally.h"
#include <iostream>
NS_SOCKET_BEGIN
void IOCPBaseServer::Stop()
{
_isStart = false;
for (auto it = _clients.begin(); it != _clients.end(); ++it)
{
it->second->Close();
it->second.reset();
}
_clients.clear();
for (size_t i = 0; i < _hWorkerThread.size(); i++)
PostQueuedCompletionStatus(_completionPort, 0, _CLOSE_THREAD, NULL);
for (size_t i = 0; i < _hWorkerThread.size(); i++)
{
WaitForSingleObject(_hWorkerThread[i], INFINITE);
CloseHandle(_hWorkerThread[i]);
}
closesocket(_listener);
WSACleanup();
_listener = NULL;
CloseHandle(_completionPort);
_completionPort = NULL;
_hWorkerThread.clear();
}
void IOCPBaseServer::Init(UINT threadSize)
{
_completionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (_completionPort == INVALID_HANDLE_VALUE)
throw std::exception("CreateIoCompletionPort Fail");
SYSTEM_INFO info;
GetSystemInfo(&info);
if (threadSize > 0)
{
if (info.dwNumberOfProcessors * 2 < threadSize)
threadSize = info.dwNumberOfProcessors * 2;
else
threadSize = info.dwNumberOfProcessors * 2;
}
else
threadSize = info.dwNumberOfProcessors * 2;
for (size_t i = 0; i < threadSize; i++)
_hWorkerThread.push_back((HANDLE)_beginthreadex(0, 0, Run, this, 0, NULL));
}
void IOCPBaseServer::Start(std::string ip, int port)
{
if (_isStart) return;
try
{
if (_completionPort == NULL)
Init();
WSADATA _wsaData;
if (WSAStartup(MAKEWORD(2, 2), &_wsaData) != 0)
throw std::exception();
if (_thread.get() == NULL)
{
_isStart = true;
memset(&_iPEndPoint, 0, sizeof(_iPEndPoint));
if (inet_pton(PF_INET, ip.c_str(), &_iPEndPoint) != 1)
{
throw std::exception();
}
_iPEndPoint.sin_family = PF_INET;
_iPEndPoint.sin_port = htons(port);
_listener = WSASocketW(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
if (_listener == INVALID_SOCKET)
{
auto error = GetLastError();
Stop();
throw std::exception("Listener Create Error : " + error);
}
//accept시 NonBlock
//unsigned long mode = 1;
//ioctlsocket(_listener, FIONBIO, &mode);
int option = 1;
setsockopt(_listener, SOL_SOCKET, SO_REUSEADDR, (char*)&option, sizeof(option));
if (::bind(_listener, (SOCKADDR*)&_iPEndPoint, sizeof(_iPEndPoint)) == SOCKET_ERROR)
{
auto error = GetLastError();
Stop();
throw std::exception("BindException : " + error);
}
if (listen(_listener, 100) == SOCKET_ERROR)
{
auto error = GetLastError();
Stop();
throw std::exception("ListenExcption : " + error);
}
_thread = std::make_unique<Threading::Thread>(std::bind(&IOCPBaseServer::StartListening, this, nullptr));
_thread->Start();
}
}
catch (...)
{
_isStart = false;
throw std::exception("Server Start Fail");
}
}
void IOCPBaseServer::StartListening(void* pObj)
{
while (_isStart)
{
SOCKADDR_IN clientAddr;
int size = sizeof(clientAddr);
memset(&clientAddr, 0, size);
SOCKET handler = accept(_listener, (SOCKADDR*)&clientAddr, &size);
if (handler == INVALID_SOCKET)
continue;
auto stateObject = new StateObject();
stateObject->Socket() = handler;
std::memcpy(&stateObject->SocketAddr(), &clientAddr, size);
stateObject->Handle() = _handleCount.Add();
AddPeer(stateObject);
OnAccepted(*stateObject);
CreateIoCompletionPort((HANDLE)stateObject->Socket(), _completionPort, (ULONG_PTR)stateObject, 0);
BeginReceive(stateObject);
}
}
void IOCPBaseServer::BeginReceive(Socket::StateObject* pStateObject)
{
DWORD flags = 0;
if (WSARecv(pStateObject->Socket(), &pStateObject->WSABuff(), 1, 0, &flags, (LPWSAOVERLAPPED)(&pStateObject->ReceiveOverlapped()), NULL) == SOCKET_ERROR)
{
int error = WSAGetLastError();
if (error != WSA_IO_PENDING)
ClosePeer(pStateObject);
}
}
int IOCPBaseServer::Invoke()
{
unsigned long bytesTrans = 0;
ULONG_PTR stateObject = 0;
Socket::Overlapped* overlapped;
while (true)
{
if (!GetQueuedCompletionStatus(_completionPort, &bytesTrans, &stateObject, (LPOVERLAPPED *)&overlapped, INFINITE))
{
int error = WSAGetLastError();
if (error != ERROR_NETNAME_DELETED)
{
printf("Error : %d", error);
break;
}
}
if ((LONG_PTR)stateObject == _CLOSE_THREAD && bytesTrans == 0)
break;
auto pHandler = reinterpret_cast<StateObject*>(stateObject);
if (bytesTrans == 0)
{
ClosePeer(pHandler);
continue;
}
if (overlapped->mode == Util::Socket::Mode::Receive)
{
pHandler->ReceiveBuffer().Append(pHandler->WSABuff().buf, bytesTrans);
try
{
OnRecieved(*pHandler);
}
catch (std::exception ex)
{
printf("%s", ex.what());
}
BeginReceive(pHandler);
}
}
return 0;
}
void IOCPBaseServer::AddPeer(StateObject* pStateObject)
{
auto finally = Common::Finally(std::bind(&Threading::CriticalSection::LeaveCriticalSection, &_read));
try
{
_read.EnterCriticalSection();
auto it = _clients.find(pStateObject->Handle());
if (it != _clients.end())
{
it->second.get()->Close();
it->second.reset();
_clients.erase(pStateObject->Handle());
}
auto _pStateObject = std::make_shared<StateObject>(*pStateObject);
_clients.insert(std::make_pair(_pStateObject->Handle(), std::move(_pStateObject)));
}
catch (...)
{
}
}
void IOCPBaseServer::ClosePeer(StateObject* pStateObject)
{
auto finally = Common::Finally(std::bind(&Threading::CriticalSection::LeaveCriticalSection, &_remove));
try
{
_remove.EnterCriticalSection();
auto handle = pStateObject->Handle();
auto it = _clients.find(handle);
if (it != _clients.end())
{
it->second.get()->Close();
it->second.reset();
_clients.erase(it);
OnDisconnected(handle);
}
else
{
if (pStateObject->Socket() != NULL)
{
pStateObject->Close();
delete pStateObject;
pStateObject = nullptr;
}
}
}
catch (...)
{
}
}
void IOCPBaseServer::BroadCast(Util::Socket::IPacket& packet, StateObject state)
{
}
unsigned int __stdcall IOCPBaseServer::Run(void* obj)
{
auto server = reinterpret_cast<IOCPBaseServer*>(obj);
return server->Invoke();
}
NS_SOCKET_END
StateObject.h
#pragma once
#include "NS.h"
#include <WinSock2.h>
#include <memory>
#include "DefaultPacket.h"
#include "../Collections/SyncQueue.h"
#include "Overlapped.h"
NS_SOCKET_BEGIN
class StateObject
{
public:
StateObject();
virtual ~StateObject();
private:
static const int BUFF_SIZE = 4096;
private:
SOCKET _sock;
SOCKADDR_IN _addr;
Overlapped _receiveOverlapped;
Overlapped _sendOverlapped;
WSABUF _wsaBuf;
char _buffer[BUFF_SIZE];
unsigned long _handle;
Util::Collections::SyncQueue<char> _receiveBuffer;
Util::Collections::SyncQueue<Util::Socket::IPacket*> _receivePacketBuffer;
public:
SOCKET & Socket();
SOCKADDR_IN& SocketAddr();
Overlapped& ReceiveOverlapped();
WSABUF& WSABuff();
unsigned long& Handle();
Util::Collections::SyncQueue<char>& ReceiveBuffer();
Util::Collections::SyncQueue<Util::Socket::IPacket*>& ReceivePacketBuffer();
public:
void Close();
void Send(Util::Socket::IPacket& packet);
};
inline StateObject::StateObject()
{
memset(&_receiveOverlapped, 0, sizeof(Overlapped));
memset(&_sendOverlapped, 0, sizeof(Overlapped));
_sendOverlapped.mode = Socket::Mode::Send;
_wsaBuf.len = BUFF_SIZE;
_wsaBuf.buf = _buffer;
_sock = NULL;
}
inline StateObject::~StateObject()
{
Close();
}
inline Util::Collections::SyncQueue<Util::Socket::IPacket*>& StateObject::ReceivePacketBuffer()
{
return _receivePacketBuffer;
}
inline Util::Collections::SyncQueue<char>& StateObject::ReceiveBuffer()
{
return _receiveBuffer;
}
inline void StateObject::Send(Util::Socket::IPacket& packet)
{
WSABUF wsaBuf;
ULONG size = 0;
char* buffer = nullptr;
packet.GetBytes(&buffer, &size);
if (buffer != nullptr)
{
wsaBuf.buf = buffer;
wsaBuf.len = size;
if (WSASend(_sock, &wsaBuf, 1, NULL, 0, (LPWSAOVERLAPPED)&_sendOverlapped, NULL) == SOCKET_ERROR)
{
if (WSAGetLastError() != WSA_IO_PENDING)
Close();
}
}
}
inline void StateObject::Close()
{
_receiveBuffer.Clear();
_receivePacketBuffer.Clear();
closesocket(_sock);
_sock = NULL;
}
inline SOCKET& StateObject::Socket()
{
return _sock;
}
inline SOCKADDR_IN& StateObject::SocketAddr()
{
return _addr;
}
inline Overlapped& StateObject::ReceiveOverlapped()
{
return _receiveOverlapped;
}
inline WSABUF& StateObject::WSABuff()
{
return _wsaBuf;
}
inline unsigned long& StateObject::Handle()
{
return _handle;
}
NS_SOCKET_END
'개발관련 > C&C++' 카테고리의 다른 글
MSMQ(MS MessageQueue) (0) | 2018.05.15 |
---|---|
Functional 이용한 델리게이트 (0) | 2018.04.26 |
StringFormat (0) | 2018.03.20 |
라이브러리 빌드 전/후 이벤트 명령어 (0) | 2018.03.16 |
C#처럼 이벤트 처리 (0) | 2018.02.27 |